use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use bytes::Bytes;
use mz_ore::bytes::SegmentedBytes;
use crate::location::{Blob, BlobMetadata, ExternalError};
pub type PostDeleteFn = Arc<
dyn Fn(&str, Result<Option<usize>, ExternalError>) -> Result<Option<usize>, ExternalError>
+ Send
+ Sync,
>;
#[derive(Default)]
struct InterceptCore {
post_delete: Option<PostDeleteFn>,
}
impl Debug for InterceptCore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let InterceptCore { post_delete } = self;
f.debug_struct("InterceptCore")
.field(
"post_delete",
&post_delete.as_ref().map(|x| format!("{:p}", x)),
)
.finish()
}
}
#[derive(Clone, Debug, Default)]
pub struct InterceptHandle {
core: Arc<Mutex<InterceptCore>>,
}
impl InterceptHandle {
pub fn set_post_delete(&self, f: Option<PostDeleteFn>) -> Option<PostDeleteFn> {
let mut core = self.core.lock().expect("lock should not be poisoned");
std::mem::replace(&mut core.post_delete, f)
}
}
#[derive(Debug)]
pub struct InterceptBlob {
handle: InterceptHandle,
blob: Arc<dyn Blob>,
}
impl InterceptBlob {
pub fn new(blob: Arc<dyn Blob>, handle: InterceptHandle) -> Self {
InterceptBlob { handle, blob }
}
}
#[async_trait]
impl Blob for InterceptBlob {
async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
self.blob.get(key).await
}
async fn list_keys_and_metadata(
&self,
key_prefix: &str,
f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
) -> Result<(), ExternalError> {
self.blob.list_keys_and_metadata(key_prefix, f).await
}
async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
self.blob.set(key, value).await
}
async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
let ret = self.blob.delete(key).await;
let post_delete = self
.handle
.core
.lock()
.expect("lock should not be poisoned")
.post_delete
.clone();
match post_delete {
Some(x) => x(key, ret),
None => ret,
}
}
async fn restore(&self, key: &str) -> Result<(), ExternalError> {
self.blob.restore(key).await
}
}