1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! File backed implementations for testing and benchmarking.
1112use std::io::ErrorKind;
13use std::path::{Path, PathBuf};
1415use anyhow::anyhow;
16use async_trait::async_trait;
17use bytes::Bytes;
18use fail::fail_point;
19use mz_ore::bytes::SegmentedBytes;
20use mz_ore::cast::CastFrom;
21use tokio::fs::{self, File};
22use tokio::io::{AsyncReadExt, AsyncWriteExt};
2324use crate::error::Error;
25use crate::location::{Blob, BlobMetadata, Determinate, ExternalError};
2627/// Configuration for opening a [FileBlob].
28#[derive(Debug, Clone)]
29pub struct FileBlobConfig {
30 base_dir: PathBuf,
31pub(crate) tombstone: bool,
32}
3334impl<P: AsRef<Path>> From<P> for FileBlobConfig {
35fn from(base_dir: P) -> Self {
36 FileBlobConfig {
37 base_dir: base_dir.as_ref().to_path_buf(),
38 tombstone: false,
39 }
40 }
41}
4243/// Implementation of [Blob] backed by files.
44#[derive(Debug)]
45pub struct FileBlob {
46 base_dir: PathBuf,
47 tombstone: bool,
48}
4950impl FileBlob {
51/// Opens the given location for non-exclusive read-write access.
52pub async fn open(config: FileBlobConfig) -> Result<Self, ExternalError> {
53let FileBlobConfig {
54 base_dir,
55 tombstone,
56 } = config;
57 fs::create_dir_all(&base_dir).await.map_err(Error::from)?;
58Ok(FileBlob {
59 base_dir,
60 tombstone,
61 })
62 }
6364fn blob_path(&self, key: &str) -> PathBuf {
65self.base_dir.join(key)
66 }
6768/// For simplicity, FileBlob maintains a single flat directory of blobs. Because files
69 /// can never use forward slashes in their names on Linux, even if escaped, we replace
70 /// forward slashes with a Unicode character that looks substantially similar, so the
71 /// file name is not interpreted as part of a directory structure. This is helpful for
72 /// compatibility with clients who are expecting an S3-like interface that both use a
73 /// flat hierarchy and allow forward slashes in file names.
74 ///
75 /// (And apologies to the callers who really did want to use U+2215 code points in their
76 /// filenames.)
77fn replace_forward_slashes(key: &str) -> String {
78 key.replace('/', "∕")
79 }
8081fn restore_forward_slashes(key: &str) -> String {
82 key.replace('∕', "/")
83 }
84}
8586#[async_trait]
87impl Blob for FileBlob {
88async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
89let file_path = self.blob_path(&FileBlob::replace_forward_slashes(key));
90let mut file = match File::open(file_path).await {
91Ok(file) => file,
92Err(err) if err.kind() == ErrorKind::NotFound => return Ok(None),
93Err(err) => return Err(err.into()),
94 };
95let mut buf = Vec::new();
96 file.read_to_end(&mut buf).await?;
97Ok(Some(SegmentedBytes::from(buf)))
98 }
99100async fn list_keys_and_metadata(
101&self,
102 key_prefix: &str,
103 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
104 ) -> Result<(), ExternalError> {
105let base_dir = self.base_dir.canonicalize()?;
106107let mut entries = fs::read_dir(&base_dir).await?;
108while let Some(entry) = entries.next_entry().await? {
109let path = entry.path().canonicalize()?;
110111if !path.is_file() {
112// Ignore '.' and '..' directory entries if they come up.
113if path == base_dir {
114continue;
115 } else if let Some(parent) = base_dir.parent() {
116if path == parent {
117continue;
118 }
119 } else {
120return Err(ExternalError::from(anyhow!(
121"unexpectedly found directory while iterating through FileBlob: {}",
122 path.display()
123 )));
124 }
125 }
126127// "Real" blobs don't have extensions. We want to exclude "expected" extensions
128 // like soft-deleted blobs or in-progress writes, and complain otherwise.
129match path.extension().and_then(|os| os.to_str()) {
130Some("bak" | "tmp") => continue,
131Some(other) => Err(ExternalError::from(anyhow!(
132"Found blob with unexpected suffix: {other}"
133)))?,
134None => {}
135 }
136137// The file name is guaranteed to be non-None iff the path is a
138 // normal file.
139let file_name = path.file_name();
140if let Some(name) = file_name {
141let name = name.to_str();
142if let Some(name) = name {
143if !name.starts_with(key_prefix) {
144continue;
145 }
146147 f(BlobMetadata {
148 key: &FileBlob::restore_forward_slashes(name),
149 size_in_bytes: entry.metadata().await?.len(),
150 });
151 }
152 }
153 }
154Ok(())
155 }
156157async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
158let file_path = self.blob_path(&FileBlob::replace_forward_slashes(key));
159160// To implement atomic set, write to a temp file and rename it into
161 // place.
162let mut tmp_name = file_path.clone();
163debug_assert_eq!(tmp_name.extension(), None);
164 tmp_name.set_extension("tmp");
165// NB: Don't use create_new(true) for this so that if we have a partial
166 // one from a previous crash, it will just get overwritten (which is
167 // safe).
168let mut file = File::create(&tmp_name).await?;
169 file.write_all(&value[..]).await?;
170171fail_point!("fileblob_set_sync", |_| {
172Err(ExternalError::from(anyhow!(
173"FileBlob::set_sync fail point reached for file {:?}",
174 file_path
175 )))
176 });
177178// fsync the file, so its contents are visible
179file.sync_all().await?;
180181let parent_dir = File::open(&self.base_dir).await?;
182// fsync the directory so it can guaranteed see the tmp file
183parent_dir.sync_all().await?;
184185// atomically rename our file
186fs::rename(tmp_name, &file_path).await?;
187188// fsync the directory once again to guarantee it can see the renamed
189 // file
190parent_dir.sync_all().await?;
191192Ok(())
193 }
194195async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
196let file_path = self.blob_path(&FileBlob::replace_forward_slashes(key));
197// TODO: strict correctness requires that we fsync the parent directory
198 // as well after file removal.
199200fail_point!("fileblob_delete_before", |_| {
201Err(ExternalError::from(anyhow!(
202"FileBlob::delete_before fail point reached for file {:?}",
203 file_path
204 )))
205 });
206207// There is a race condition here between metadata and remove_file where
208 // we won't return the correct length of the deleted file if it changes
209 // between the two calls. Luckily 1. we only every write to a given key
210 // once and it never changes and 2. this is only used for metrics
211 // anyway.
212213let size_bytes = match fs::metadata(&file_path).await {
214Ok(x) => x.len(),
215Err(err) => {
216// delete is documented to succeed if the key doesn't exist.
217if err.kind() == ErrorKind::NotFound {
218return Ok(None);
219 }
220return Err(err.into());
221 }
222 };
223224let result = if self.tombstone {
225// Instead of removing the file, rename it!
226let mut tombstone_name = file_path.clone();
227assert_eq!(tombstone_name.extension(), None);
228 tombstone_name.set_extension("bak");
229 fs::rename(&file_path, &tombstone_name).await
230} else {
231 fs::remove_file(&file_path).await
232};
233234if let Err(err) = result {
235// delete is documented to succeed if the key doesn't exist.
236if err.kind() == ErrorKind::NotFound {
237return Ok(None);
238 }
239return Err(err.into());
240 };
241242fail_point!("fileblob_delete_after", |_| {
243Err(ExternalError::from(anyhow!(
244"FileBlob::delete_after fail point reached for file {:?}",
245 file_path
246 )))
247 });
248249Ok(Some(usize::cast_from(size_bytes)))
250 }
251252async fn restore(&self, key: &str) -> Result<(), ExternalError> {
253let file_path = self.blob_path(&FileBlob::replace_forward_slashes(key));
254255if fs::try_exists(&file_path).await? {
256return Ok(());
257 }
258259let mut tombstone_name = file_path.clone();
260assert_eq!(tombstone_name.extension(), None);
261 tombstone_name.set_extension("bak");
262match fs::rename(&tombstone_name, &file_path).await {
263Err(e) if e.kind() == ErrorKind::NotFound => {
264// TODO: should we treat not-found as determinate elsewhere also?
265Err(Determinate::new(anyhow!("no tombstone or file for key: {key}")).into())
266 }
267 other => Ok(other?),
268 }
269 }
270}
271272#[cfg(test)]
273mod tests {
274use crate::location::tests::blob_impl_test;
275276use super::*;
277278#[mz_ore::test(tokio::test)]
279 #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
280async fn file_blob() -> Result<(), ExternalError> {
281let temp_dir = tempfile::tempdir().map_err(Error::from)?;
282 blob_impl_test(move |path| {
283let instance_dir = temp_dir.path().join(path);
284 FileBlob::open(instance_dir.into())
285 })
286 .await?;
287288let temp_dir = tempfile::tempdir().map_err(Error::from)?;
289 blob_impl_test(move |path| {
290let instance_dir = temp_dir.path().join(path);
291let mut cfg: FileBlobConfig = instance_dir.into();
292 cfg.tombstone = true;
293 FileBlob::open(cfg)
294 })
295 .await?;
296297Ok(())
298 }
299}