1use std::io::ErrorKind;
13use std::path::{Path, PathBuf};
14
15use anyhow::anyhow;
16use async_trait::async_trait;
17use bytes::Bytes;
18use fail::fail_point;
19use mz_ore::bytes::SegmentedBytes;
20use mz_ore::cast::CastFrom;
21use tokio::fs::{self, File};
22use tokio::io::{AsyncReadExt, AsyncWriteExt};
23
24use crate::error::Error;
25use crate::location::{Blob, BlobMetadata, Determinate, ExternalError};
26
27#[derive(Debug, Clone)]
29pub struct FileBlobConfig {
30 base_dir: PathBuf,
31 pub(crate) tombstone: bool,
32}
33
34impl<P: AsRef<Path>> From<P> for FileBlobConfig {
35 fn from(base_dir: P) -> Self {
36 FileBlobConfig {
37 base_dir: base_dir.as_ref().to_path_buf(),
38 tombstone: false,
39 }
40 }
41}
42
43#[derive(Debug)]
45pub struct FileBlob {
46 base_dir: PathBuf,
47 tombstone: bool,
48}
49
50impl FileBlob {
51 pub async fn open(config: FileBlobConfig) -> Result<Self, ExternalError> {
53 let FileBlobConfig {
54 base_dir,
55 tombstone,
56 } = config;
57 fs::create_dir_all(&base_dir).await.map_err(Error::from)?;
58 Ok(FileBlob {
59 base_dir,
60 tombstone,
61 })
62 }
63
64 fn blob_path(&self, key: &str) -> PathBuf {
65 self.base_dir.join(key)
66 }
67
68 fn replace_forward_slashes(key: &str) -> String {
78 key.replace('/', "∕")
79 }
80
81 fn restore_forward_slashes(key: &str) -> String {
82 key.replace('∕', "/")
83 }
84}
85
86#[async_trait]
87impl Blob for FileBlob {
88 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
89 let file_path = self.blob_path(&FileBlob::replace_forward_slashes(key));
90 let mut file = match File::open(file_path).await {
91 Ok(file) => file,
92 Err(err) if err.kind() == ErrorKind::NotFound => return Ok(None),
93 Err(err) => return Err(err.into()),
94 };
95 let mut buf = Vec::new();
96 file.read_to_end(&mut buf).await?;
97 Ok(Some(SegmentedBytes::from(buf)))
98 }
99
100 async fn list_keys_and_metadata(
101 &self,
102 key_prefix: &str,
103 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
104 ) -> Result<(), ExternalError> {
105 let base_dir = self.base_dir.canonicalize()?;
106
107 let mut entries = fs::read_dir(&base_dir).await?;
108 while let Some(entry) = entries.next_entry().await? {
109 let path = entry.path().canonicalize()?;
110
111 if !path.is_file() {
112 if path == base_dir {
114 continue;
115 } else if let Some(parent) = base_dir.parent() {
116 if path == parent {
117 continue;
118 }
119 } else {
120 return Err(ExternalError::from(anyhow!(
121 "unexpectedly found directory while iterating through FileBlob: {}",
122 path.display()
123 )));
124 }
125 }
126
127 match path.extension().and_then(|os| os.to_str()) {
130 Some("bak" | "tmp") => continue,
131 Some(other) => Err(ExternalError::from(anyhow!(
132 "Found blob with unexpected suffix: {other}"
133 )))?,
134 None => {}
135 }
136
137 let file_name = path.file_name();
140 if let Some(name) = file_name {
141 let name = name.to_str();
142 if let Some(name) = name {
143 if !name.starts_with(key_prefix) {
144 continue;
145 }
146
147 f(BlobMetadata {
148 key: &FileBlob::restore_forward_slashes(name),
149 size_in_bytes: entry.metadata().await?.len(),
150 });
151 }
152 }
153 }
154 Ok(())
155 }
156
157 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
158 let file_path = self.blob_path(&FileBlob::replace_forward_slashes(key));
159
160 let mut tmp_name = file_path.clone();
163 debug_assert_eq!(tmp_name.extension(), None);
164 tmp_name.set_extension("tmp");
165 let mut file = File::create(&tmp_name).await?;
169 file.write_all(&value[..]).await?;
170
171 fail_point!("fileblob_set_sync", |_| {
172 Err(ExternalError::from(anyhow!(
173 "FileBlob::set_sync fail point reached for file {:?}",
174 file_path
175 )))
176 });
177
178 file.sync_all().await?;
180
181 let parent_dir = File::open(&self.base_dir).await?;
182 parent_dir.sync_all().await?;
184
185 fs::rename(tmp_name, &file_path).await?;
187
188 parent_dir.sync_all().await?;
191
192 Ok(())
193 }
194
195 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
196 let file_path = self.blob_path(&FileBlob::replace_forward_slashes(key));
197 fail_point!("fileblob_delete_before", |_| {
201 Err(ExternalError::from(anyhow!(
202 "FileBlob::delete_before fail point reached for file {:?}",
203 file_path
204 )))
205 });
206
207 let size_bytes = match fs::metadata(&file_path).await {
214 Ok(x) => x.len(),
215 Err(err) => {
216 if err.kind() == ErrorKind::NotFound {
218 return Ok(None);
219 }
220 return Err(err.into());
221 }
222 };
223
224 let result = if self.tombstone {
225 let mut tombstone_name = file_path.clone();
227 assert_eq!(tombstone_name.extension(), None);
228 tombstone_name.set_extension("bak");
229 fs::rename(&file_path, &tombstone_name).await
230 } else {
231 fs::remove_file(&file_path).await
232 };
233
234 if let Err(err) = result {
235 if err.kind() == ErrorKind::NotFound {
237 return Ok(None);
238 }
239 return Err(err.into());
240 };
241
242 fail_point!("fileblob_delete_after", |_| {
243 Err(ExternalError::from(anyhow!(
244 "FileBlob::delete_after fail point reached for file {:?}",
245 file_path
246 )))
247 });
248
249 Ok(Some(usize::cast_from(size_bytes)))
250 }
251
252 async fn restore(&self, key: &str) -> Result<(), ExternalError> {
253 let file_path = self.blob_path(&FileBlob::replace_forward_slashes(key));
254
255 if fs::try_exists(&file_path).await? {
256 return Ok(());
257 }
258
259 let mut tombstone_name = file_path.clone();
260 assert_eq!(tombstone_name.extension(), None);
261 tombstone_name.set_extension("bak");
262 match fs::rename(&tombstone_name, &file_path).await {
263 Err(e) if e.kind() == ErrorKind::NotFound => {
264 Err(Determinate::new(anyhow!("no tombstone or file for key: {key}")).into())
266 }
267 other => Ok(other?),
268 }
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use crate::location::tests::blob_impl_test;
275
276 use super::*;
277
278 #[mz_ore::test(tokio::test)]
279 #[cfg_attr(miri, ignore)] async fn file_blob() -> Result<(), ExternalError> {
281 let temp_dir = tempfile::tempdir().map_err(Error::from)?;
282 blob_impl_test(move |path| {
283 let instance_dir = temp_dir.path().join(path);
284 FileBlob::open(instance_dir.into())
285 })
286 .await?;
287
288 let temp_dir = tempfile::tempdir().map_err(Error::from)?;
289 blob_impl_test(move |path| {
290 let instance_dir = temp_dir.path().join(path);
291 let mut cfg: FileBlobConfig = instance_dir.into();
292 cfg.tombstone = true;
293 FileBlob::open(cfg)
294 })
295 .await?;
296
297 Ok(())
298 }
299}