mz_persist/
intercept.rs
1use std::fmt::Debug;
13use std::sync::{Arc, Mutex};
14
15use async_trait::async_trait;
16use bytes::Bytes;
17use mz_ore::bytes::SegmentedBytes;
18
19use crate::location::{Blob, BlobMetadata, ExternalError};
20
21pub type PostDeleteFn = Arc<
23 dyn Fn(&str, Result<Option<usize>, ExternalError>) -> Result<Option<usize>, ExternalError>
24 + Send
25 + Sync,
26>;
27
28#[derive(Default)]
29struct InterceptCore {
30 post_delete: Option<PostDeleteFn>,
31}
32
33impl Debug for InterceptCore {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 let InterceptCore { post_delete } = self;
36 f.debug_struct("InterceptCore")
37 .field(
38 "post_delete",
39 &post_delete.as_ref().map(|x| format!("{:p}", x)),
40 )
41 .finish()
42 }
43}
44
45#[derive(Clone, Debug, Default)]
47pub struct InterceptHandle {
48 core: Arc<Mutex<InterceptCore>>,
49}
50
51impl InterceptHandle {
52 pub fn set_post_delete(&self, f: Option<PostDeleteFn>) -> Option<PostDeleteFn> {
56 let mut core = self.core.lock().expect("lock should not be poisoned");
57 std::mem::replace(&mut core.post_delete, f)
58 }
59}
60
61#[derive(Debug)]
66pub struct InterceptBlob {
67 handle: InterceptHandle,
68 blob: Arc<dyn Blob>,
69}
70
71impl InterceptBlob {
72 pub fn new(blob: Arc<dyn Blob>, handle: InterceptHandle) -> Self {
74 InterceptBlob { handle, blob }
75 }
76}
77
78#[async_trait]
79impl Blob for InterceptBlob {
80 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
81 self.blob.get(key).await
82 }
83
84 async fn list_keys_and_metadata(
85 &self,
86 key_prefix: &str,
87 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
88 ) -> Result<(), ExternalError> {
89 self.blob.list_keys_and_metadata(key_prefix, f).await
90 }
91
92 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
93 self.blob.set(key, value).await
94 }
95
96 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
97 let ret = self.blob.delete(key).await;
98 let post_delete = self
99 .handle
100 .core
101 .lock()
102 .expect("lock should not be poisoned")
103 .post_delete
104 .clone();
105 match post_delete {
106 Some(x) => x(key, ret),
107 None => ret,
108 }
109 }
110
111 async fn restore(&self, key: &str) -> Result<(), ExternalError> {
112 self.blob.restore(key).await
113 }
114}