1use std::collections::HashMap;
21
22use async_trait::async_trait;
23use futures::lock::{Mutex, MutexGuard};
24use itertools::Itertools;
25
26use super::namespace_state::NamespaceState;
27use crate::io::FileIO;
28use crate::spec::{TableMetadata, TableMetadataBuilder};
29use crate::table::Table;
30use crate::{
31 Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
32 TableCommit, TableCreation, TableIdent,
33};
34
35pub const MEMORY_CATALOG_WAREHOUSE: &str = "warehouse";
37
38const LOCATION: &str = "location";
40
41#[derive(Debug)]
43pub struct MemoryCatalogBuilder(MemoryCatalogConfig);
44
45impl Default for MemoryCatalogBuilder {
46 fn default() -> Self {
47 Self(MemoryCatalogConfig {
48 name: None,
49 warehouse: "".to_string(),
50 props: HashMap::new(),
51 })
52 }
53}
54
55impl CatalogBuilder for MemoryCatalogBuilder {
56 type C = MemoryCatalog;
57
58 fn load(
59 mut self,
60 name: impl Into<String>,
61 props: HashMap<String, String>,
62 ) -> impl Future<Output = Result<Self::C>> + Send {
63 self.0.name = Some(name.into());
64
65 if props.contains_key(MEMORY_CATALOG_WAREHOUSE) {
66 self.0.warehouse = props
67 .get(MEMORY_CATALOG_WAREHOUSE)
68 .cloned()
69 .unwrap_or_default()
70 }
71
72 self.0.props = props
74 .into_iter()
75 .filter(|(k, _)| k != MEMORY_CATALOG_WAREHOUSE)
76 .collect();
77
78 let result = {
79 if self.0.name.is_none() {
80 Err(Error::new(
81 ErrorKind::DataInvalid,
82 "Catalog name is required",
83 ))
84 } else if self.0.warehouse.is_empty() {
85 Err(Error::new(
86 ErrorKind::DataInvalid,
87 "Catalog warehouse is required",
88 ))
89 } else {
90 MemoryCatalog::new(self.0)
91 }
92 };
93
94 std::future::ready(result)
95 }
96}
97
98#[derive(Clone, Debug)]
99pub(crate) struct MemoryCatalogConfig {
100 name: Option<String>,
101 warehouse: String,
102 props: HashMap<String, String>,
103}
104
105#[derive(Debug)]
107pub struct MemoryCatalog {
108 root_namespace_state: Mutex<NamespaceState>,
109 file_io: FileIO,
110 warehouse_location: String,
111}
112
113impl MemoryCatalog {
114 fn new(config: MemoryCatalogConfig) -> Result<Self> {
116 Ok(Self {
117 root_namespace_state: Mutex::new(NamespaceState::default()),
118 file_io: FileIO::from_path(&config.warehouse)?
119 .with_props(config.props)
120 .build()?,
121 warehouse_location: config.warehouse,
122 })
123 }
124
125 async fn load_table_from_locked_state(
127 &self,
128 table_ident: &TableIdent,
129 root_namespace_state: &MutexGuard<'_, NamespaceState>,
130 ) -> Result<Table> {
131 let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?;
132 let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
133
134 Table::builder()
135 .identifier(table_ident.clone())
136 .metadata(metadata)
137 .metadata_location(metadata_location.to_string())
138 .file_io(self.file_io.clone())
139 .build()
140 }
141}
142
143#[async_trait]
144impl Catalog for MemoryCatalog {
145 async fn list_namespaces(
147 &self,
148 maybe_parent: Option<&NamespaceIdent>,
149 ) -> Result<Vec<NamespaceIdent>> {
150 let root_namespace_state = self.root_namespace_state.lock().await;
151
152 match maybe_parent {
153 None => {
154 let namespaces = root_namespace_state
155 .list_top_level_namespaces()
156 .into_iter()
157 .map(|str| NamespaceIdent::new(str.to_string()))
158 .collect_vec();
159
160 Ok(namespaces)
161 }
162 Some(parent_namespace_ident) => {
163 let namespaces = root_namespace_state
164 .list_namespaces_under(parent_namespace_ident)?
165 .into_iter()
166 .map(|name| NamespaceIdent::new(name.to_string()))
167 .collect_vec();
168
169 Ok(namespaces)
170 }
171 }
172 }
173
174 async fn create_namespace(
176 &self,
177 namespace_ident: &NamespaceIdent,
178 properties: HashMap<String, String>,
179 ) -> Result<Namespace> {
180 let mut root_namespace_state = self.root_namespace_state.lock().await;
181
182 root_namespace_state.insert_new_namespace(namespace_ident, properties.clone())?;
183 let namespace = Namespace::with_properties(namespace_ident.clone(), properties);
184
185 Ok(namespace)
186 }
187
188 async fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<Namespace> {
190 let root_namespace_state = self.root_namespace_state.lock().await;
191
192 let namespace = Namespace::with_properties(
193 namespace_ident.clone(),
194 root_namespace_state
195 .get_properties(namespace_ident)?
196 .clone(),
197 );
198
199 Ok(namespace)
200 }
201
202 async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> Result<bool> {
204 let guarded_namespaces = self.root_namespace_state.lock().await;
205
206 Ok(guarded_namespaces.namespace_exists(namespace_ident))
207 }
208
209 async fn update_namespace(
215 &self,
216 namespace_ident: &NamespaceIdent,
217 properties: HashMap<String, String>,
218 ) -> Result<()> {
219 let mut root_namespace_state = self.root_namespace_state.lock().await;
220
221 root_namespace_state.replace_properties(namespace_ident, properties)
222 }
223
224 async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<()> {
226 let mut root_namespace_state = self.root_namespace_state.lock().await;
227
228 root_namespace_state.remove_existing_namespace(namespace_ident)
229 }
230
231 async fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result<Vec<TableIdent>> {
233 let root_namespace_state = self.root_namespace_state.lock().await;
234
235 let table_names = root_namespace_state.list_tables(namespace_ident)?;
236 let table_idents = table_names
237 .into_iter()
238 .map(|table_name| TableIdent::new(namespace_ident.clone(), table_name.clone()))
239 .collect_vec();
240
241 Ok(table_idents)
242 }
243
244 async fn create_table(
246 &self,
247 namespace_ident: &NamespaceIdent,
248 table_creation: TableCreation,
249 ) -> Result<Table> {
250 let mut root_namespace_state = self.root_namespace_state.lock().await;
251
252 let table_name = table_creation.name.clone();
253 let table_ident = TableIdent::new(namespace_ident.clone(), table_name);
254
255 let (table_creation, location) = match table_creation.location.clone() {
256 Some(location) => (table_creation, location),
257 None => {
258 let namespace_properties = root_namespace_state.get_properties(namespace_ident)?;
259 let location_prefix = match namespace_properties.get(LOCATION) {
260 Some(namespace_location) => namespace_location.clone(),
261 None => format!("{}/{}", self.warehouse_location, namespace_ident.join("/")),
262 };
263
264 let location = format!("{}/{}", location_prefix, table_ident.name());
265
266 let new_table_creation = TableCreation {
267 location: Some(location.clone()),
268 ..table_creation
269 };
270
271 (new_table_creation, location)
272 }
273 };
274
275 let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
276 .build()?
277 .metadata;
278 let metadata_location = MetadataLocation::new_with_table_location(location).to_string();
279
280 metadata.write_to(&self.file_io, &metadata_location).await?;
281
282 root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?;
283
284 Table::builder()
285 .file_io(self.file_io.clone())
286 .metadata_location(metadata_location)
287 .metadata(metadata)
288 .identifier(table_ident)
289 .build()
290 }
291
292 async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
294 let root_namespace_state = self.root_namespace_state.lock().await;
295
296 self.load_table_from_locked_state(table_ident, &root_namespace_state)
297 .await
298 }
299
300 async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> {
302 let mut root_namespace_state = self.root_namespace_state.lock().await;
303
304 let metadata_location = root_namespace_state.remove_existing_table(table_ident)?;
305 self.file_io.delete(&metadata_location).await
306 }
307
308 async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
310 let root_namespace_state = self.root_namespace_state.lock().await;
311
312 root_namespace_state.table_exists(table_ident)
313 }
314
315 async fn rename_table(
317 &self,
318 src_table_ident: &TableIdent,
319 dst_table_ident: &TableIdent,
320 ) -> Result<()> {
321 let mut root_namespace_state = self.root_namespace_state.lock().await;
322
323 let mut new_root_namespace_state = root_namespace_state.clone();
324 let metadata_location = new_root_namespace_state
325 .get_existing_table_location(src_table_ident)?
326 .clone();
327 new_root_namespace_state.remove_existing_table(src_table_ident)?;
328 new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?;
329 *root_namespace_state = new_root_namespace_state;
330
331 Ok(())
332 }
333
334 async fn register_table(
335 &self,
336 table_ident: &TableIdent,
337 metadata_location: String,
338 ) -> Result<Table> {
339 let mut root_namespace_state = self.root_namespace_state.lock().await;
340 root_namespace_state.insert_new_table(&table_ident.clone(), metadata_location.clone())?;
341
342 let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
343
344 Table::builder()
345 .file_io(self.file_io.clone())
346 .metadata_location(metadata_location)
347 .metadata(metadata)
348 .identifier(table_ident.clone())
349 .build()
350 }
351
352 async fn update_table(&self, commit: TableCommit) -> Result<Table> {
354 let mut root_namespace_state = self.root_namespace_state.lock().await;
355
356 let current_table = self
357 .load_table_from_locked_state(commit.identifier(), &root_namespace_state)
358 .await?;
359
360 let staged_table = commit.apply(current_table)?;
362
363 staged_table
365 .metadata()
366 .write_to(
367 staged_table.file_io(),
368 staged_table.metadata_location_result()?,
369 )
370 .await?;
371
372 let updated_table = root_namespace_state.commit_table_update(staged_table)?;
374
375 Ok(updated_table)
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use std::collections::HashSet;
382 use std::hash::Hash;
383 use std::iter::FromIterator;
384 use std::vec;
385
386 use regex::Regex;
387 use tempfile::TempDir;
388
389 use super::*;
390 use crate::io::FileIOBuilder;
391 use crate::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
392 use crate::transaction::{ApplyTransactionAction, Transaction};
393
394 fn temp_path() -> String {
395 let temp_dir = TempDir::new().unwrap();
396 temp_dir.path().to_str().unwrap().to_string()
397 }
398
399 async fn new_memory_catalog() -> impl Catalog {
400 let warehouse_location = temp_path();
401 MemoryCatalogBuilder::default()
402 .load(
403 "memory",
404 HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_location)]),
405 )
406 .await
407 .unwrap()
408 }
409
410 async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
411 let _ = catalog
412 .create_namespace(namespace_ident, HashMap::new())
413 .await
414 .unwrap();
415 }
416
417 async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
418 for namespace_ident in namespace_idents {
419 let _ = create_namespace(catalog, namespace_ident).await;
420 }
421 }
422
423 fn to_set<T: Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
424 HashSet::from_iter(vec)
425 }
426
427 fn simple_table_schema() -> Schema {
428 Schema::builder()
429 .with_fields(vec![
430 NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
431 ])
432 .build()
433 .unwrap()
434 }
435
436 async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) -> Table {
437 catalog
438 .create_table(
439 &table_ident.namespace,
440 TableCreation::builder()
441 .name(table_ident.name().into())
442 .schema(simple_table_schema())
443 .build(),
444 )
445 .await
446 .unwrap()
447 }
448
449 async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
450 for table_ident in table_idents {
451 create_table(catalog, table_ident).await;
452 }
453 }
454
455 async fn create_table_with_namespace<C: Catalog>(catalog: &C) -> Table {
456 let namespace_ident = NamespaceIdent::new("abc".into());
457 create_namespace(catalog, &namespace_ident).await;
458
459 let table_ident = TableIdent::new(namespace_ident, "test".to_string());
460 create_table(catalog, &table_ident).await
461 }
462
463 fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
464 assert_eq!(table.identifier(), expected_table_ident);
465
466 let metadata = table.metadata();
467
468 assert_eq!(metadata.current_schema().as_ref(), expected_schema);
469
470 let expected_partition_spec = PartitionSpec::builder((*expected_schema).clone())
471 .with_spec_id(0)
472 .build()
473 .unwrap();
474
475 assert_eq!(
476 metadata
477 .partition_specs_iter()
478 .map(|p| p.as_ref())
479 .collect_vec(),
480 vec![&expected_partition_spec]
481 );
482
483 let expected_sorted_order = SortOrder::builder()
484 .with_order_id(0)
485 .with_fields(vec![])
486 .build(expected_schema)
487 .unwrap();
488
489 assert_eq!(
490 metadata
491 .sort_orders_iter()
492 .map(|s| s.as_ref())
493 .collect_vec(),
494 vec![&expected_sorted_order]
495 );
496
497 assert_eq!(metadata.properties(), &HashMap::new());
498
499 assert!(!table.readonly());
500 }
501
502 const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
503
504 fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
505 let actual = table.metadata_location().unwrap().to_string();
506 let regex = Regex::new(regex_str).unwrap();
507 assert!(
508 regex.is_match(&actual),
509 "Expected metadata location to match regex, but got location: {} and regex: {}",
510 actual,
511 regex
512 )
513 }
514
515 #[tokio::test]
516 async fn test_list_namespaces_returns_empty_vector() {
517 let catalog = new_memory_catalog().await;
518
519 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
520 }
521
522 #[tokio::test]
523 async fn test_list_namespaces_returns_single_namespace() {
524 let catalog = new_memory_catalog().await;
525 let namespace_ident = NamespaceIdent::new("abc".into());
526 create_namespace(&catalog, &namespace_ident).await;
527
528 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
529 namespace_ident
530 ]);
531 }
532
533 #[tokio::test]
534 async fn test_list_namespaces_returns_multiple_namespaces() {
535 let catalog = new_memory_catalog().await;
536 let namespace_ident_1 = NamespaceIdent::new("a".into());
537 let namespace_ident_2 = NamespaceIdent::new("b".into());
538 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
539
540 assert_eq!(
541 to_set(catalog.list_namespaces(None).await.unwrap()),
542 to_set(vec![namespace_ident_1, namespace_ident_2])
543 );
544 }
545
546 #[tokio::test]
547 async fn test_list_namespaces_returns_only_top_level_namespaces() {
548 let catalog = new_memory_catalog().await;
549 let namespace_ident_1 = NamespaceIdent::new("a".into());
550 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
551 let namespace_ident_3 = NamespaceIdent::new("b".into());
552 create_namespaces(&catalog, &vec![
553 &namespace_ident_1,
554 &namespace_ident_2,
555 &namespace_ident_3,
556 ])
557 .await;
558
559 assert_eq!(
560 to_set(catalog.list_namespaces(None).await.unwrap()),
561 to_set(vec![namespace_ident_1, namespace_ident_3])
562 );
563 }
564
565 #[tokio::test]
566 async fn test_list_namespaces_returns_no_namespaces_under_parent() {
567 let catalog = new_memory_catalog().await;
568 let namespace_ident_1 = NamespaceIdent::new("a".into());
569 let namespace_ident_2 = NamespaceIdent::new("b".into());
570 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
571
572 assert_eq!(
573 catalog
574 .list_namespaces(Some(&namespace_ident_1))
575 .await
576 .unwrap(),
577 vec![]
578 );
579 }
580
581 #[tokio::test]
582 async fn test_list_namespaces_returns_namespace_under_parent() {
583 let catalog = new_memory_catalog().await;
584 let namespace_ident_1 = NamespaceIdent::new("a".into());
585 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
586 let namespace_ident_3 = NamespaceIdent::new("c".into());
587 create_namespaces(&catalog, &vec![
588 &namespace_ident_1,
589 &namespace_ident_2,
590 &namespace_ident_3,
591 ])
592 .await;
593
594 assert_eq!(
595 to_set(catalog.list_namespaces(None).await.unwrap()),
596 to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
597 );
598
599 assert_eq!(
600 catalog
601 .list_namespaces(Some(&namespace_ident_1))
602 .await
603 .unwrap(),
604 vec![NamespaceIdent::new("b".into())]
605 );
606 }
607
608 #[tokio::test]
609 async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
610 let catalog = new_memory_catalog().await;
611 let namespace_ident_1 = NamespaceIdent::new("a".to_string());
612 let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
613 let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
614 let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
615 let namespace_ident_5 = NamespaceIdent::new("b".into());
616 create_namespaces(&catalog, &vec![
617 &namespace_ident_1,
618 &namespace_ident_2,
619 &namespace_ident_3,
620 &namespace_ident_4,
621 &namespace_ident_5,
622 ])
623 .await;
624
625 assert_eq!(
626 to_set(
627 catalog
628 .list_namespaces(Some(&namespace_ident_1))
629 .await
630 .unwrap()
631 ),
632 to_set(vec![
633 NamespaceIdent::new("a".into()),
634 NamespaceIdent::new("b".into()),
635 NamespaceIdent::new("c".into()),
636 ])
637 );
638 }
639
640 #[tokio::test]
641 async fn test_namespace_exists_returns_false() {
642 let catalog = new_memory_catalog().await;
643 let namespace_ident = NamespaceIdent::new("a".into());
644 create_namespace(&catalog, &namespace_ident).await;
645
646 assert!(
647 !catalog
648 .namespace_exists(&NamespaceIdent::new("b".into()))
649 .await
650 .unwrap()
651 );
652 }
653
654 #[tokio::test]
655 async fn test_namespace_exists_returns_true() {
656 let catalog = new_memory_catalog().await;
657 let namespace_ident = NamespaceIdent::new("a".into());
658 create_namespace(&catalog, &namespace_ident).await;
659
660 assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
661 }
662
663 #[tokio::test]
664 async fn test_create_namespace_with_empty_properties() {
665 let catalog = new_memory_catalog().await;
666 let namespace_ident = NamespaceIdent::new("a".into());
667
668 assert_eq!(
669 catalog
670 .create_namespace(&namespace_ident, HashMap::new())
671 .await
672 .unwrap(),
673 Namespace::new(namespace_ident.clone())
674 );
675
676 assert_eq!(
677 catalog.get_namespace(&namespace_ident).await.unwrap(),
678 Namespace::with_properties(namespace_ident, HashMap::new())
679 );
680 }
681
682 #[tokio::test]
683 async fn test_create_namespace_with_properties() {
684 let catalog = new_memory_catalog().await;
685 let namespace_ident = NamespaceIdent::new("abc".into());
686
687 let mut properties: HashMap<String, String> = HashMap::new();
688 properties.insert("k".into(), "v".into());
689
690 assert_eq!(
691 catalog
692 .create_namespace(&namespace_ident, properties.clone())
693 .await
694 .unwrap(),
695 Namespace::with_properties(namespace_ident.clone(), properties.clone())
696 );
697
698 assert_eq!(
699 catalog.get_namespace(&namespace_ident).await.unwrap(),
700 Namespace::with_properties(namespace_ident, properties)
701 );
702 }
703
704 #[tokio::test]
705 async fn test_create_namespace_throws_error_if_namespace_already_exists() {
706 let catalog = new_memory_catalog().await;
707 let namespace_ident = NamespaceIdent::new("a".into());
708 create_namespace(&catalog, &namespace_ident).await;
709
710 assert_eq!(
711 catalog
712 .create_namespace(&namespace_ident, HashMap::new())
713 .await
714 .unwrap_err()
715 .to_string(),
716 format!(
717 "NamespaceAlreadyExists => Cannot create namespace {:?}. Namespace already exists.",
718 &namespace_ident
719 )
720 );
721
722 assert_eq!(
723 catalog.get_namespace(&namespace_ident).await.unwrap(),
724 Namespace::with_properties(namespace_ident, HashMap::new())
725 );
726 }
727
728 #[tokio::test]
729 async fn test_create_nested_namespace() {
730 let catalog = new_memory_catalog().await;
731 let parent_namespace_ident = NamespaceIdent::new("a".into());
732 create_namespace(&catalog, &parent_namespace_ident).await;
733
734 let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
735
736 assert_eq!(
737 catalog
738 .create_namespace(&child_namespace_ident, HashMap::new())
739 .await
740 .unwrap(),
741 Namespace::new(child_namespace_ident.clone())
742 );
743
744 assert_eq!(
745 catalog.get_namespace(&child_namespace_ident).await.unwrap(),
746 Namespace::with_properties(child_namespace_ident, HashMap::new())
747 );
748 }
749
750 #[tokio::test]
751 async fn test_create_deeply_nested_namespace() {
752 let catalog = new_memory_catalog().await;
753 let namespace_ident_a = NamespaceIdent::new("a".into());
754 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
755 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
756
757 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
758
759 assert_eq!(
760 catalog
761 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
762 .await
763 .unwrap(),
764 Namespace::new(namespace_ident_a_b_c.clone())
765 );
766
767 assert_eq!(
768 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
769 Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
770 );
771 }
772
773 #[tokio::test]
774 async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() {
775 let catalog = new_memory_catalog().await;
776
777 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
778
779 assert_eq!(
780 catalog
781 .create_namespace(&nested_namespace_ident, HashMap::new())
782 .await
783 .unwrap_err()
784 .to_string(),
785 format!(
786 "NamespaceNotFound => No such namespace: {:?}",
787 NamespaceIdent::new("a".into())
788 )
789 );
790
791 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
792 }
793
794 #[tokio::test]
795 async fn test_create_deeply_nested_namespace_throws_error_if_intermediate_namespace_doesnt_exist()
796 {
797 let catalog = new_memory_catalog().await;
798
799 let namespace_ident_a = NamespaceIdent::new("a".into());
800 create_namespace(&catalog, &namespace_ident_a).await;
801
802 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
803
804 assert_eq!(
805 catalog
806 .create_namespace(&namespace_ident_a_b_c, HashMap::new())
807 .await
808 .unwrap_err()
809 .to_string(),
810 format!(
811 "NamespaceNotFound => No such namespace: {:?}",
812 NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()
813 )
814 );
815
816 assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
817 namespace_ident_a.clone()
818 ]);
819
820 assert_eq!(
821 catalog
822 .list_namespaces(Some(&namespace_ident_a))
823 .await
824 .unwrap(),
825 vec![]
826 );
827 }
828
829 #[tokio::test]
830 async fn test_get_namespace() {
831 let catalog = new_memory_catalog().await;
832 let namespace_ident = NamespaceIdent::new("abc".into());
833
834 let mut properties: HashMap<String, String> = HashMap::new();
835 properties.insert("k".into(), "v".into());
836 let _ = catalog
837 .create_namespace(&namespace_ident, properties.clone())
838 .await
839 .unwrap();
840
841 assert_eq!(
842 catalog.get_namespace(&namespace_ident).await.unwrap(),
843 Namespace::with_properties(namespace_ident, properties)
844 )
845 }
846
847 #[tokio::test]
848 async fn test_get_nested_namespace() {
849 let catalog = new_memory_catalog().await;
850 let namespace_ident_a = NamespaceIdent::new("a".into());
851 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
852 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
853
854 assert_eq!(
855 catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
856 Namespace::with_properties(namespace_ident_a_b, HashMap::new())
857 );
858 }
859
860 #[tokio::test]
861 async fn test_get_deeply_nested_namespace() {
862 let catalog = new_memory_catalog().await;
863 let namespace_ident_a = NamespaceIdent::new("a".into());
864 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
865 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
866 create_namespaces(&catalog, &vec![
867 &namespace_ident_a,
868 &namespace_ident_a_b,
869 &namespace_ident_a_b_c,
870 ])
871 .await;
872
873 assert_eq!(
874 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
875 Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
876 );
877 }
878
879 #[tokio::test]
880 async fn test_get_namespace_throws_error_if_namespace_doesnt_exist() {
881 let catalog = new_memory_catalog().await;
882 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
883
884 let non_existent_namespace_ident = NamespaceIdent::new("b".into());
885 assert_eq!(
886 catalog
887 .get_namespace(&non_existent_namespace_ident)
888 .await
889 .unwrap_err()
890 .to_string(),
891 format!(
892 "NamespaceNotFound => No such namespace: {:?}",
893 non_existent_namespace_ident
894 )
895 )
896 }
897
898 #[tokio::test]
899 async fn test_update_namespace() {
900 let catalog = new_memory_catalog().await;
901 let namespace_ident = NamespaceIdent::new("abc".into());
902 create_namespace(&catalog, &namespace_ident).await;
903
904 let mut new_properties: HashMap<String, String> = HashMap::new();
905 new_properties.insert("k".into(), "v".into());
906
907 catalog
908 .update_namespace(&namespace_ident, new_properties.clone())
909 .await
910 .unwrap();
911
912 assert_eq!(
913 catalog.get_namespace(&namespace_ident).await.unwrap(),
914 Namespace::with_properties(namespace_ident, new_properties)
915 )
916 }
917
918 #[tokio::test]
919 async fn test_update_nested_namespace() {
920 let catalog = new_memory_catalog().await;
921 let namespace_ident_a = NamespaceIdent::new("a".into());
922 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
923 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
924
925 let mut new_properties = HashMap::new();
926 new_properties.insert("k".into(), "v".into());
927
928 catalog
929 .update_namespace(&namespace_ident_a_b, new_properties.clone())
930 .await
931 .unwrap();
932
933 assert_eq!(
934 catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
935 Namespace::with_properties(namespace_ident_a_b, new_properties)
936 );
937 }
938
939 #[tokio::test]
940 async fn test_update_deeply_nested_namespace() {
941 let catalog = new_memory_catalog().await;
942 let namespace_ident_a = NamespaceIdent::new("a".into());
943 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
944 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
945 create_namespaces(&catalog, &vec![
946 &namespace_ident_a,
947 &namespace_ident_a_b,
948 &namespace_ident_a_b_c,
949 ])
950 .await;
951
952 let mut new_properties = HashMap::new();
953 new_properties.insert("k".into(), "v".into());
954
955 catalog
956 .update_namespace(&namespace_ident_a_b_c, new_properties.clone())
957 .await
958 .unwrap();
959
960 assert_eq!(
961 catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
962 Namespace::with_properties(namespace_ident_a_b_c, new_properties)
963 );
964 }
965
966 #[tokio::test]
967 async fn test_update_namespace_throws_error_if_namespace_doesnt_exist() {
968 let catalog = new_memory_catalog().await;
969 create_namespace(&catalog, &NamespaceIdent::new("abc".into())).await;
970
971 let non_existent_namespace_ident = NamespaceIdent::new("def".into());
972 assert_eq!(
973 catalog
974 .update_namespace(&non_existent_namespace_ident, HashMap::new())
975 .await
976 .unwrap_err()
977 .to_string(),
978 format!(
979 "NamespaceNotFound => No such namespace: {:?}",
980 non_existent_namespace_ident
981 )
982 )
983 }
984
985 #[tokio::test]
986 async fn test_drop_namespace() {
987 let catalog = new_memory_catalog().await;
988 let namespace_ident = NamespaceIdent::new("abc".into());
989 create_namespace(&catalog, &namespace_ident).await;
990
991 catalog.drop_namespace(&namespace_ident).await.unwrap();
992
993 assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
994 }
995
996 #[tokio::test]
997 async fn test_drop_nested_namespace() {
998 let catalog = new_memory_catalog().await;
999 let namespace_ident_a = NamespaceIdent::new("a".into());
1000 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1001 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1002
1003 catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1004
1005 assert!(
1006 !catalog
1007 .namespace_exists(&namespace_ident_a_b)
1008 .await
1009 .unwrap()
1010 );
1011
1012 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1013 }
1014
1015 #[tokio::test]
1016 async fn test_drop_deeply_nested_namespace() {
1017 let catalog = new_memory_catalog().await;
1018 let namespace_ident_a = NamespaceIdent::new("a".into());
1019 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1020 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1021 create_namespaces(&catalog, &vec![
1022 &namespace_ident_a,
1023 &namespace_ident_a_b,
1024 &namespace_ident_a_b_c,
1025 ])
1026 .await;
1027
1028 catalog
1029 .drop_namespace(&namespace_ident_a_b_c)
1030 .await
1031 .unwrap();
1032
1033 assert!(
1034 !catalog
1035 .namespace_exists(&namespace_ident_a_b_c)
1036 .await
1037 .unwrap()
1038 );
1039
1040 assert!(
1041 catalog
1042 .namespace_exists(&namespace_ident_a_b)
1043 .await
1044 .unwrap()
1045 );
1046
1047 assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1048 }
1049
1050 #[tokio::test]
1051 async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1052 let catalog = new_memory_catalog().await;
1053
1054 let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1055 assert_eq!(
1056 catalog
1057 .drop_namespace(&non_existent_namespace_ident)
1058 .await
1059 .unwrap_err()
1060 .to_string(),
1061 format!(
1062 "NamespaceNotFound => No such namespace: {:?}",
1063 non_existent_namespace_ident
1064 )
1065 )
1066 }
1067
1068 #[tokio::test]
1069 async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1070 let catalog = new_memory_catalog().await;
1071 create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1072
1073 let non_existent_namespace_ident =
1074 NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1075 assert_eq!(
1076 catalog
1077 .drop_namespace(&non_existent_namespace_ident)
1078 .await
1079 .unwrap_err()
1080 .to_string(),
1081 format!(
1082 "NamespaceNotFound => No such namespace: {:?}",
1083 non_existent_namespace_ident
1084 )
1085 )
1086 }
1087
1088 #[tokio::test]
1089 async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() {
1090 let catalog = new_memory_catalog().await;
1091 let namespace_ident_a = NamespaceIdent::new("a".into());
1092 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1093 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1094
1095 catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1096
1097 assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1098
1099 assert!(
1100 !catalog
1101 .namespace_exists(&namespace_ident_a_b)
1102 .await
1103 .unwrap()
1104 );
1105 }
1106
1107 #[tokio::test]
1108 async fn test_create_table_with_location() {
1109 let tmp_dir = TempDir::new().unwrap();
1110 let catalog = new_memory_catalog().await;
1111 let namespace_ident = NamespaceIdent::new("a".into());
1112 create_namespace(&catalog, &namespace_ident).await;
1113
1114 let table_name = "abc";
1115 let location = tmp_dir.path().to_str().unwrap().to_string();
1116 let table_creation = TableCreation::builder()
1117 .name(table_name.into())
1118 .location(location.clone())
1119 .schema(simple_table_schema())
1120 .build();
1121
1122 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1123
1124 assert_table_eq(
1125 &catalog
1126 .create_table(&namespace_ident, table_creation)
1127 .await
1128 .unwrap(),
1129 &expected_table_ident,
1130 &simple_table_schema(),
1131 );
1132
1133 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1134
1135 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1136
1137 assert!(
1138 table
1139 .metadata_location()
1140 .unwrap()
1141 .to_string()
1142 .starts_with(&location)
1143 )
1144 }
1145
1146 #[tokio::test]
1147 async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1148 let warehouse_location = temp_path();
1149 let catalog = MemoryCatalogBuilder::default()
1150 .load(
1151 "memory",
1152 HashMap::from([(
1153 MEMORY_CATALOG_WAREHOUSE.to_string(),
1154 warehouse_location.clone(),
1155 )]),
1156 )
1157 .await
1158 .unwrap();
1159
1160 let namespace_ident = NamespaceIdent::new("a".into());
1161 let mut namespace_properties = HashMap::new();
1162 let namespace_location = temp_path();
1163 namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1164 catalog
1165 .create_namespace(&namespace_ident, namespace_properties)
1166 .await
1167 .unwrap();
1168
1169 let table_name = "tbl1";
1170 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1171 let expected_table_metadata_location_regex = format!(
1172 "^{}/tbl1/metadata/00000-{}.metadata.json$",
1173 namespace_location, UUID_REGEX_STR,
1174 );
1175
1176 let table = catalog
1177 .create_table(
1178 &namespace_ident,
1179 TableCreation::builder()
1180 .name(table_name.into())
1181 .schema(simple_table_schema())
1182 .build(),
1184 )
1185 .await
1186 .unwrap();
1187 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1188 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1189
1190 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1191 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1192 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1193 }
1194
1195 #[tokio::test]
1196 async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1197 {
1198 let warehouse_location = temp_path();
1199 let catalog = MemoryCatalogBuilder::default()
1200 .load(
1201 "memory",
1202 HashMap::from([(
1203 MEMORY_CATALOG_WAREHOUSE.to_string(),
1204 warehouse_location.clone(),
1205 )]),
1206 )
1207 .await
1208 .unwrap();
1209
1210 let namespace_ident = NamespaceIdent::new("a".into());
1211 let mut namespace_properties = HashMap::new();
1212 let namespace_location = temp_path();
1213 namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1214 catalog
1215 .create_namespace(&namespace_ident, namespace_properties)
1216 .await
1217 .unwrap();
1218
1219 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1220 let mut nested_namespace_properties = HashMap::new();
1221 let nested_namespace_location = temp_path();
1222 nested_namespace_properties
1223 .insert(LOCATION.to_string(), nested_namespace_location.to_string());
1224 catalog
1225 .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1226 .await
1227 .unwrap();
1228
1229 let table_name = "tbl1";
1230 let expected_table_ident =
1231 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1232 let expected_table_metadata_location_regex = format!(
1233 "^{}/tbl1/metadata/00000-{}.metadata.json$",
1234 nested_namespace_location, UUID_REGEX_STR,
1235 );
1236
1237 let table = catalog
1238 .create_table(
1239 &nested_namespace_ident,
1240 TableCreation::builder()
1241 .name(table_name.into())
1242 .schema(simple_table_schema())
1243 .build(),
1245 )
1246 .await
1247 .unwrap();
1248 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1249 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1250
1251 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1252 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1253 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1254 }
1255
1256 #[tokio::test]
1257 async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1258 {
1259 let warehouse_location = temp_path();
1260 let catalog = MemoryCatalogBuilder::default()
1261 .load(
1262 "memory",
1263 HashMap::from([(
1264 MEMORY_CATALOG_WAREHOUSE.to_string(),
1265 warehouse_location.clone(),
1266 )]),
1267 )
1268 .await
1269 .unwrap();
1270
1271 let namespace_ident = NamespaceIdent::new("a".into());
1272 let namespace_properties = HashMap::new();
1274 catalog
1275 .create_namespace(&namespace_ident, namespace_properties)
1276 .await
1277 .unwrap();
1278
1279 let table_name = "tbl1";
1280 let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1281 let expected_table_metadata_location_regex = format!(
1282 "^{}/a/tbl1/metadata/00000-{}.metadata.json$",
1283 warehouse_location, UUID_REGEX_STR
1284 );
1285
1286 let table = catalog
1287 .create_table(
1288 &namespace_ident,
1289 TableCreation::builder()
1290 .name(table_name.into())
1291 .schema(simple_table_schema())
1292 .build(),
1294 )
1295 .await
1296 .unwrap();
1297 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1298 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1299
1300 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1301 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1302 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1303 }
1304
1305 #[tokio::test]
1306 async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1307 {
1308 let warehouse_location = temp_path();
1309 let catalog = MemoryCatalogBuilder::default()
1310 .load(
1311 "memory",
1312 HashMap::from([(
1313 MEMORY_CATALOG_WAREHOUSE.to_string(),
1314 warehouse_location.clone(),
1315 )]),
1316 )
1317 .await
1318 .unwrap();
1319
1320 let namespace_ident = NamespaceIdent::new("a".into());
1321 catalog
1322 .create_namespace(&namespace_ident, HashMap::new())
1324 .await
1325 .unwrap();
1326
1327 let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1328 catalog
1329 .create_namespace(&nested_namespace_ident, HashMap::new())
1331 .await
1332 .unwrap();
1333
1334 let table_name = "tbl1";
1335 let expected_table_ident =
1336 TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1337 let expected_table_metadata_location_regex = format!(
1338 "^{}/a/b/tbl1/metadata/00000-{}.metadata.json$",
1339 warehouse_location, UUID_REGEX_STR
1340 );
1341
1342 let table = catalog
1343 .create_table(
1344 &nested_namespace_ident,
1345 TableCreation::builder()
1346 .name(table_name.into())
1347 .schema(simple_table_schema())
1348 .build(),
1350 )
1351 .await
1352 .unwrap();
1353 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1354 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1355
1356 let table = catalog.load_table(&expected_table_ident).await.unwrap();
1357 assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1358 assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1359 }
1360
1361 #[tokio::test]
1362 async fn test_create_table_throws_error_if_table_location_and_namespace_location_and_warehouse_location_are_missing()
1363 {
1364 let catalog = MemoryCatalogBuilder::default()
1365 .load("memory", HashMap::from([]))
1366 .await;
1367
1368 assert!(catalog.is_err());
1369 assert_eq!(
1370 catalog.unwrap_err().to_string(),
1371 "DataInvalid => Catalog warehouse is required"
1372 );
1373 }
1374
1375 #[tokio::test]
1376 async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1377 let catalog = new_memory_catalog().await;
1378 let namespace_ident = NamespaceIdent::new("a".into());
1379 create_namespace(&catalog, &namespace_ident).await;
1380 let table_name = "tbl1";
1381 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1382 create_table(&catalog, &table_ident).await;
1383
1384 let tmp_dir = TempDir::new().unwrap();
1385 let location = tmp_dir.path().to_str().unwrap().to_string();
1386
1387 assert_eq!(
1388 catalog
1389 .create_table(
1390 &namespace_ident,
1391 TableCreation::builder()
1392 .name(table_name.into())
1393 .schema(simple_table_schema())
1394 .location(location)
1395 .build()
1396 )
1397 .await
1398 .unwrap_err()
1399 .to_string(),
1400 format!(
1401 "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1402 &table_ident
1403 )
1404 );
1405 }
1406
1407 #[tokio::test]
1408 async fn test_list_tables_returns_empty_vector() {
1409 let catalog = new_memory_catalog().await;
1410 let namespace_ident = NamespaceIdent::new("a".into());
1411 create_namespace(&catalog, &namespace_ident).await;
1412
1413 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1414 }
1415
1416 #[tokio::test]
1417 async fn test_list_tables_returns_a_single_table() {
1418 let catalog = new_memory_catalog().await;
1419 let namespace_ident = NamespaceIdent::new("n1".into());
1420 create_namespace(&catalog, &namespace_ident).await;
1421
1422 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1423 create_table(&catalog, &table_ident).await;
1424
1425 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1426 table_ident
1427 ]);
1428 }
1429
1430 #[tokio::test]
1431 async fn test_list_tables_returns_multiple_tables() {
1432 let catalog = new_memory_catalog().await;
1433 let namespace_ident = NamespaceIdent::new("n1".into());
1434 create_namespace(&catalog, &namespace_ident).await;
1435
1436 let table_ident_1 = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1437 let table_ident_2 = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1438 let _ = create_tables(&catalog, vec![&table_ident_1, &table_ident_2]).await;
1439
1440 assert_eq!(
1441 to_set(catalog.list_tables(&namespace_ident).await.unwrap()),
1442 to_set(vec![table_ident_1, table_ident_2])
1443 );
1444 }
1445
1446 #[tokio::test]
1447 async fn test_list_tables_returns_tables_from_correct_namespace() {
1448 let catalog = new_memory_catalog().await;
1449 let namespace_ident_1 = NamespaceIdent::new("n1".into());
1450 let namespace_ident_2 = NamespaceIdent::new("n2".into());
1451 create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1452
1453 let table_ident_1 = TableIdent::new(namespace_ident_1.clone(), "tbl1".into());
1454 let table_ident_2 = TableIdent::new(namespace_ident_1.clone(), "tbl2".into());
1455 let table_ident_3 = TableIdent::new(namespace_ident_2.clone(), "tbl1".into());
1456 let _ = create_tables(&catalog, vec![
1457 &table_ident_1,
1458 &table_ident_2,
1459 &table_ident_3,
1460 ])
1461 .await;
1462
1463 assert_eq!(
1464 to_set(catalog.list_tables(&namespace_ident_1).await.unwrap()),
1465 to_set(vec![table_ident_1, table_ident_2])
1466 );
1467
1468 assert_eq!(
1469 to_set(catalog.list_tables(&namespace_ident_2).await.unwrap()),
1470 to_set(vec![table_ident_3])
1471 );
1472 }
1473
1474 #[tokio::test]
1475 async fn test_list_tables_returns_table_under_nested_namespace() {
1476 let catalog = new_memory_catalog().await;
1477 let namespace_ident_a = NamespaceIdent::new("a".into());
1478 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1479 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1480
1481 let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1482 create_table(&catalog, &table_ident).await;
1483
1484 assert_eq!(
1485 catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1486 vec![table_ident]
1487 );
1488 }
1489
1490 #[tokio::test]
1491 async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1492 let catalog = new_memory_catalog().await;
1493
1494 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1495
1496 assert_eq!(
1497 catalog
1498 .list_tables(&non_existent_namespace_ident)
1499 .await
1500 .unwrap_err()
1501 .to_string(),
1502 format!(
1503 "NamespaceNotFound => No such namespace: {:?}",
1504 non_existent_namespace_ident
1505 ),
1506 );
1507 }
1508
1509 #[tokio::test]
1510 async fn test_drop_table() {
1511 let catalog = new_memory_catalog().await;
1512 let namespace_ident = NamespaceIdent::new("n1".into());
1513 create_namespace(&catalog, &namespace_ident).await;
1514 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1515 create_table(&catalog, &table_ident).await;
1516
1517 catalog.drop_table(&table_ident).await.unwrap();
1518 }
1519
1520 #[tokio::test]
1521 async fn test_drop_table_drops_table_under_nested_namespace() {
1522 let catalog = new_memory_catalog().await;
1523 let namespace_ident_a = NamespaceIdent::new("a".into());
1524 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1525 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1526
1527 let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1528 create_table(&catalog, &table_ident).await;
1529
1530 catalog.drop_table(&table_ident).await.unwrap();
1531
1532 assert_eq!(
1533 catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1534 vec![]
1535 );
1536 }
1537
1538 #[tokio::test]
1539 async fn test_drop_table_throws_error_if_namespace_doesnt_exist() {
1540 let catalog = new_memory_catalog().await;
1541
1542 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1543 let non_existent_table_ident =
1544 TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1545
1546 assert_eq!(
1547 catalog
1548 .drop_table(&non_existent_table_ident)
1549 .await
1550 .unwrap_err()
1551 .to_string(),
1552 format!(
1553 "NamespaceNotFound => No such namespace: {:?}",
1554 non_existent_namespace_ident
1555 ),
1556 );
1557 }
1558
1559 #[tokio::test]
1560 async fn test_drop_table_throws_error_if_table_doesnt_exist() {
1561 let catalog = new_memory_catalog().await;
1562 let namespace_ident = NamespaceIdent::new("n1".into());
1563 create_namespace(&catalog, &namespace_ident).await;
1564
1565 let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1566
1567 assert_eq!(
1568 catalog
1569 .drop_table(&non_existent_table_ident)
1570 .await
1571 .unwrap_err()
1572 .to_string(),
1573 format!(
1574 "TableNotFound => No such table: {:?}",
1575 non_existent_table_ident
1576 ),
1577 );
1578 }
1579
1580 #[tokio::test]
1581 async fn test_table_exists_returns_true() {
1582 let catalog = new_memory_catalog().await;
1583 let namespace_ident = NamespaceIdent::new("n1".into());
1584 create_namespace(&catalog, &namespace_ident).await;
1585 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1586 create_table(&catalog, &table_ident).await;
1587
1588 assert!(catalog.table_exists(&table_ident).await.unwrap());
1589 }
1590
1591 #[tokio::test]
1592 async fn test_table_exists_returns_false() {
1593 let catalog = new_memory_catalog().await;
1594 let namespace_ident = NamespaceIdent::new("n1".into());
1595 create_namespace(&catalog, &namespace_ident).await;
1596 let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1597
1598 assert!(
1599 !catalog
1600 .table_exists(&non_existent_table_ident)
1601 .await
1602 .unwrap()
1603 );
1604 }
1605
1606 #[tokio::test]
1607 async fn test_table_exists_under_nested_namespace() {
1608 let catalog = new_memory_catalog().await;
1609 let namespace_ident_a = NamespaceIdent::new("a".into());
1610 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1611 create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1612
1613 let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1614 create_table(&catalog, &table_ident).await;
1615
1616 assert!(catalog.table_exists(&table_ident).await.unwrap());
1617
1618 let non_existent_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl2".into());
1619 assert!(
1620 !catalog
1621 .table_exists(&non_existent_table_ident)
1622 .await
1623 .unwrap()
1624 );
1625 }
1626
1627 #[tokio::test]
1628 async fn test_table_exists_throws_error_if_namespace_doesnt_exist() {
1629 let catalog = new_memory_catalog().await;
1630
1631 let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1632 let non_existent_table_ident =
1633 TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1634
1635 assert_eq!(
1636 catalog
1637 .table_exists(&non_existent_table_ident)
1638 .await
1639 .unwrap_err()
1640 .to_string(),
1641 format!(
1642 "NamespaceNotFound => No such namespace: {:?}",
1643 non_existent_namespace_ident
1644 ),
1645 );
1646 }
1647
1648 #[tokio::test]
1649 async fn test_rename_table_in_same_namespace() {
1650 let catalog = new_memory_catalog().await;
1651 let namespace_ident = NamespaceIdent::new("n1".into());
1652 create_namespace(&catalog, &namespace_ident).await;
1653 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1654 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1655 create_table(&catalog, &src_table_ident).await;
1656
1657 catalog
1658 .rename_table(&src_table_ident, &dst_table_ident)
1659 .await
1660 .unwrap();
1661
1662 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1663 dst_table_ident
1664 ],);
1665 }
1666
1667 #[tokio::test]
1668 async fn test_rename_table_across_namespaces() {
1669 let catalog = new_memory_catalog().await;
1670 let src_namespace_ident = NamespaceIdent::new("a".into());
1671 let dst_namespace_ident = NamespaceIdent::new("b".into());
1672 create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
1673 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1674 let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
1675 create_table(&catalog, &src_table_ident).await;
1676
1677 catalog
1678 .rename_table(&src_table_ident, &dst_table_ident)
1679 .await
1680 .unwrap();
1681
1682 assert_eq!(
1683 catalog.list_tables(&src_namespace_ident).await.unwrap(),
1684 vec![],
1685 );
1686
1687 assert_eq!(
1688 catalog.list_tables(&dst_namespace_ident).await.unwrap(),
1689 vec![dst_table_ident],
1690 );
1691 }
1692
1693 #[tokio::test]
1694 async fn test_rename_table_src_table_is_same_as_dst_table() {
1695 let catalog = new_memory_catalog().await;
1696 let namespace_ident = NamespaceIdent::new("n1".into());
1697 create_namespace(&catalog, &namespace_ident).await;
1698 let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1699 create_table(&catalog, &table_ident).await;
1700
1701 catalog
1702 .rename_table(&table_ident, &table_ident)
1703 .await
1704 .unwrap();
1705
1706 assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1707 table_ident
1708 ],);
1709 }
1710
1711 #[tokio::test]
1712 async fn test_rename_table_across_nested_namespaces() {
1713 let catalog = new_memory_catalog().await;
1714 let namespace_ident_a = NamespaceIdent::new("a".into());
1715 let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1716 let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1717 create_namespaces(&catalog, &vec![
1718 &namespace_ident_a,
1719 &namespace_ident_a_b,
1720 &namespace_ident_a_b_c,
1721 ])
1722 .await;
1723
1724 let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
1725 create_tables(&catalog, vec![&src_table_ident]).await;
1726
1727 let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1728 catalog
1729 .rename_table(&src_table_ident, &dst_table_ident)
1730 .await
1731 .unwrap();
1732
1733 assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
1734
1735 assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
1736 }
1737
1738 #[tokio::test]
1739 async fn test_rename_table_throws_error_if_src_namespace_doesnt_exist() {
1740 let catalog = new_memory_catalog().await;
1741
1742 let non_existent_src_namespace_ident = NamespaceIdent::new("n1".into());
1743 let src_table_ident =
1744 TableIdent::new(non_existent_src_namespace_ident.clone(), "tbl1".into());
1745
1746 let dst_namespace_ident = NamespaceIdent::new("n2".into());
1747 create_namespace(&catalog, &dst_namespace_ident).await;
1748 let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl1".into());
1749
1750 assert_eq!(
1751 catalog
1752 .rename_table(&src_table_ident, &dst_table_ident)
1753 .await
1754 .unwrap_err()
1755 .to_string(),
1756 format!(
1757 "NamespaceNotFound => No such namespace: {:?}",
1758 non_existent_src_namespace_ident
1759 ),
1760 );
1761 }
1762
1763 #[tokio::test]
1764 async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
1765 let catalog = new_memory_catalog().await;
1766 let src_namespace_ident = NamespaceIdent::new("n1".into());
1767 let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1768 create_namespace(&catalog, &src_namespace_ident).await;
1769 create_table(&catalog, &src_table_ident).await;
1770
1771 let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
1772 let dst_table_ident =
1773 TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
1774 assert_eq!(
1775 catalog
1776 .rename_table(&src_table_ident, &dst_table_ident)
1777 .await
1778 .unwrap_err()
1779 .to_string(),
1780 format!(
1781 "NamespaceNotFound => No such namespace: {:?}",
1782 non_existent_dst_namespace_ident
1783 ),
1784 );
1785 }
1786
1787 #[tokio::test]
1788 async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
1789 let catalog = new_memory_catalog().await;
1790 let namespace_ident = NamespaceIdent::new("n1".into());
1791 create_namespace(&catalog, &namespace_ident).await;
1792 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1793 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1794
1795 assert_eq!(
1796 catalog
1797 .rename_table(&src_table_ident, &dst_table_ident)
1798 .await
1799 .unwrap_err()
1800 .to_string(),
1801 format!("TableNotFound => No such table: {:?}", src_table_ident),
1802 );
1803 }
1804
1805 #[tokio::test]
1806 async fn test_rename_table_throws_error_if_dst_table_already_exists() {
1807 let catalog = new_memory_catalog().await;
1808 let namespace_ident = NamespaceIdent::new("n1".into());
1809 create_namespace(&catalog, &namespace_ident).await;
1810 let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1811 let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1812 create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
1813
1814 assert_eq!(
1815 catalog
1816 .rename_table(&src_table_ident, &dst_table_ident)
1817 .await
1818 .unwrap_err()
1819 .to_string(),
1820 format!(
1821 "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1822 &dst_table_ident
1823 ),
1824 );
1825 }
1826
1827 #[tokio::test]
1828 async fn test_register_table() {
1829 let catalog = new_memory_catalog().await;
1831 let namespace_ident = NamespaceIdent::new("test_namespace".into());
1832 create_namespace(&catalog, &namespace_ident).await;
1833
1834 let source_table_ident = TableIdent::new(namespace_ident.clone(), "source_table".into());
1836 create_table(&catalog, &source_table_ident).await;
1837
1838 let source_table = catalog.load_table(&source_table_ident).await.unwrap();
1840 let metadata_location = source_table.metadata_location().unwrap().to_string();
1841
1842 let register_table_ident =
1844 TableIdent::new(namespace_ident.clone(), "register_table".into());
1845 let registered_table = catalog
1846 .register_table(®ister_table_ident, metadata_location.clone())
1847 .await
1848 .unwrap();
1849
1850 assert_eq!(registered_table.identifier(), ®ister_table_ident);
1852
1853 assert_eq!(
1855 registered_table.metadata_location().unwrap().to_string(),
1856 metadata_location
1857 );
1858
1859 assert!(catalog.table_exists(®ister_table_ident).await.unwrap());
1861
1862 let loaded_table = catalog.load_table(®ister_table_ident).await.unwrap();
1864 assert_eq!(loaded_table.identifier(), ®ister_table_ident);
1865 assert_eq!(
1866 loaded_table.metadata_location().unwrap().to_string(),
1867 metadata_location
1868 );
1869 }
1870
1871 #[tokio::test]
1872 async fn test_update_table() {
1873 let catalog = new_memory_catalog().await;
1874
1875 let table = create_table_with_namespace(&catalog).await;
1876
1877 assert!(!table.metadata().properties().contains_key("key"));
1879
1880 let tx = Transaction::new(&table);
1882 let updated_table = tx
1883 .update_table_properties()
1884 .set("key".to_string(), "value".to_string())
1885 .apply(tx)
1886 .unwrap()
1887 .commit(&catalog)
1888 .await
1889 .unwrap();
1890
1891 assert_eq!(
1892 updated_table.metadata().properties().get("key").unwrap(),
1893 "value"
1894 );
1895
1896 assert_eq!(table.identifier(), updated_table.identifier());
1897 assert_eq!(table.metadata().uuid(), updated_table.metadata().uuid());
1898 assert!(table.metadata().last_updated_ms() < updated_table.metadata().last_updated_ms());
1899 assert_ne!(table.metadata_location(), updated_table.metadata_location());
1900
1901 assert!(
1902 table.metadata().metadata_log().len() < updated_table.metadata().metadata_log().len()
1903 );
1904 }
1905
1906 #[tokio::test]
1907 async fn test_update_table_fails_if_table_doesnt_exist() {
1908 let catalog = new_memory_catalog().await;
1909
1910 let namespace_ident = NamespaceIdent::new("a".into());
1911 create_namespace(&catalog, &namespace_ident).await;
1912
1913 let table_ident = TableIdent::new(namespace_ident, "test".to_string());
1915 let table = build_table(table_ident);
1916
1917 let tx = Transaction::new(&table);
1918 let err = tx
1919 .update_table_properties()
1920 .set("key".to_string(), "value".to_string())
1921 .apply(tx)
1922 .unwrap()
1923 .commit(&catalog)
1924 .await
1925 .unwrap_err();
1926 assert_eq!(err.kind(), ErrorKind::TableNotFound);
1927 }
1928
1929 fn build_table(ident: TableIdent) -> Table {
1930 let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1931
1932 let temp_dir = TempDir::new().unwrap();
1933 let location = temp_dir.path().to_str().unwrap().to_string();
1934
1935 let table_creation = TableCreation::builder()
1936 .name(ident.name().to_string())
1937 .schema(simple_table_schema())
1938 .location(location)
1939 .build();
1940 let metadata = TableMetadataBuilder::from_table_creation(table_creation)
1941 .unwrap()
1942 .build()
1943 .unwrap()
1944 .metadata;
1945
1946 Table::builder()
1947 .identifier(ident)
1948 .metadata(metadata)
1949 .file_io(file_io)
1950 .build()
1951 .unwrap()
1952 }
1953}