mz_persist/
file.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! File backed implementations for testing and benchmarking.
11
12use std::io::ErrorKind;
13use std::path::{Path, PathBuf};
14
15use anyhow::anyhow;
16use async_trait::async_trait;
17use bytes::Bytes;
18use fail::fail_point;
19use mz_ore::bytes::SegmentedBytes;
20use mz_ore::cast::CastFrom;
21use tokio::fs::{self, File};
22use tokio::io::{AsyncReadExt, AsyncWriteExt};
23
24use crate::error::Error;
25use crate::location::{Blob, BlobMetadata, Determinate, ExternalError};
26
27/// Configuration for opening a [FileBlob].
28#[derive(Debug, Clone)]
29pub struct FileBlobConfig {
30    base_dir: PathBuf,
31    pub(crate) tombstone: bool,
32}
33
34impl<P: AsRef<Path>> From<P> for FileBlobConfig {
35    fn from(base_dir: P) -> Self {
36        FileBlobConfig {
37            base_dir: base_dir.as_ref().to_path_buf(),
38            tombstone: false,
39        }
40    }
41}
42
43/// Implementation of [Blob] backed by files.
44#[derive(Debug)]
45pub struct FileBlob {
46    base_dir: PathBuf,
47    tombstone: bool,
48}
49
50impl FileBlob {
51    /// Opens the given location for non-exclusive read-write access.
52    pub async fn open(config: FileBlobConfig) -> Result<Self, ExternalError> {
53        let FileBlobConfig {
54            base_dir,
55            tombstone,
56        } = config;
57        fs::create_dir_all(&base_dir).await.map_err(Error::from)?;
58        Ok(FileBlob {
59            base_dir,
60            tombstone,
61        })
62    }
63
64    fn blob_path(&self, key: &str) -> PathBuf {
65        self.base_dir.join(key)
66    }
67
68    /// For simplicity, FileBlob maintains a single flat directory of blobs. Because files
69    /// can never use forward slashes in their names on Linux, even if escaped, we replace
70    /// forward slashes with a Unicode character that looks substantially similar, so the
71    /// file name is not interpreted as part of a directory structure. This is helpful for
72    /// compatibility with clients who are expecting an S3-like interface that both use a
73    /// flat hierarchy and allow forward slashes in file names.
74    ///
75    /// (And apologies to the callers who really did want to use U+2215 code points in their
76    /// filenames.)
77    fn replace_forward_slashes(key: &str) -> String {
78        key.replace('/', "∕")
79    }
80
81    fn restore_forward_slashes(key: &str) -> String {
82        key.replace('∕', "/")
83    }
84}
85
86#[async_trait]
87impl Blob for FileBlob {
88    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
89        let file_path = self.blob_path(&FileBlob::replace_forward_slashes(key));
90        let mut file = match File::open(file_path).await {
91            Ok(file) => file,
92            Err(err) if err.kind() == ErrorKind::NotFound => return Ok(None),
93            Err(err) => return Err(err.into()),
94        };
95        let mut buf = Vec::new();
96        file.read_to_end(&mut buf).await?;
97        Ok(Some(SegmentedBytes::from(buf)))
98    }
99
100    async fn list_keys_and_metadata(
101        &self,
102        key_prefix: &str,
103        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
104    ) -> Result<(), ExternalError> {
105        let base_dir = self.base_dir.canonicalize()?;
106
107        let mut entries = fs::read_dir(&base_dir).await?;
108        while let Some(entry) = entries.next_entry().await? {
109            let path = entry.path().canonicalize()?;
110
111            if !path.is_file() {
112                // Ignore '.' and '..' directory entries if they come up.
113                if path == base_dir {
114                    continue;
115                } else if let Some(parent) = base_dir.parent() {
116                    if path == parent {
117                        continue;
118                    }
119                } else {
120                    return Err(ExternalError::from(anyhow!(
121                        "unexpectedly found directory while iterating through FileBlob: {}",
122                        path.display()
123                    )));
124                }
125            }
126
127            // "Real" blobs don't have extensions. We want to exclude "expected" extensions
128            // like soft-deleted blobs or in-progress writes, and complain otherwise.
129            match path.extension().and_then(|os| os.to_str()) {
130                Some("bak" | "tmp") => continue,
131                Some(other) => Err(ExternalError::from(anyhow!(
132                    "Found blob with unexpected suffix: {other}"
133                )))?,
134                None => {}
135            }
136
137            // The file name is guaranteed to be non-None iff the path is a
138            // normal file.
139            let file_name = path.file_name();
140            if let Some(name) = file_name {
141                let name = name.to_str();
142                if let Some(name) = name {
143                    if !name.starts_with(key_prefix) {
144                        continue;
145                    }
146
147                    f(BlobMetadata {
148                        key: &FileBlob::restore_forward_slashes(name),
149                        size_in_bytes: entry.metadata().await?.len(),
150                    });
151                }
152            }
153        }
154        Ok(())
155    }
156
157    async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
158        let file_path = self.blob_path(&FileBlob::replace_forward_slashes(key));
159
160        // To implement atomic set, write to a temp file and rename it into
161        // place.
162        let mut tmp_name = file_path.clone();
163        debug_assert_eq!(tmp_name.extension(), None);
164        tmp_name.set_extension("tmp");
165        // NB: Don't use create_new(true) for this so that if we have a partial
166        // one from a previous crash, it will just get overwritten (which is
167        // safe).
168        let mut file = File::create(&tmp_name).await?;
169        file.write_all(&value[..]).await?;
170
171        fail_point!("fileblob_set_sync", |_| {
172            Err(ExternalError::from(anyhow!(
173                "FileBlob::set_sync fail point reached for file {:?}",
174                file_path
175            )))
176        });
177
178        // fsync the file, so its contents are visible
179        file.sync_all().await?;
180
181        let parent_dir = File::open(&self.base_dir).await?;
182        // fsync the directory so it can guaranteed see the tmp file
183        parent_dir.sync_all().await?;
184
185        // atomically rename our file
186        fs::rename(tmp_name, &file_path).await?;
187
188        // fsync the directory once again to guarantee it can see the renamed
189        // file
190        parent_dir.sync_all().await?;
191
192        Ok(())
193    }
194
195    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
196        let file_path = self.blob_path(&FileBlob::replace_forward_slashes(key));
197        // TODO: strict correctness requires that we fsync the parent directory
198        // as well after file removal.
199
200        fail_point!("fileblob_delete_before", |_| {
201            Err(ExternalError::from(anyhow!(
202                "FileBlob::delete_before fail point reached for file {:?}",
203                file_path
204            )))
205        });
206
207        // There is a race condition here between metadata and remove_file where
208        // we won't return the correct length of the deleted file if it changes
209        // between the two calls. Luckily 1. we only every write to a given key
210        // once and it never changes and 2. this is only used for metrics
211        // anyway.
212
213        let size_bytes = match fs::metadata(&file_path).await {
214            Ok(x) => x.len(),
215            Err(err) => {
216                // delete is documented to succeed if the key doesn't exist.
217                if err.kind() == ErrorKind::NotFound {
218                    return Ok(None);
219                }
220                return Err(err.into());
221            }
222        };
223
224        let result = if self.tombstone {
225            // Instead of removing the file, rename it!
226            let mut tombstone_name = file_path.clone();
227            assert_eq!(tombstone_name.extension(), None);
228            tombstone_name.set_extension("bak");
229            fs::rename(&file_path, &tombstone_name).await
230        } else {
231            fs::remove_file(&file_path).await
232        };
233
234        if let Err(err) = result {
235            // delete is documented to succeed if the key doesn't exist.
236            if err.kind() == ErrorKind::NotFound {
237                return Ok(None);
238            }
239            return Err(err.into());
240        };
241
242        fail_point!("fileblob_delete_after", |_| {
243            Err(ExternalError::from(anyhow!(
244                "FileBlob::delete_after fail point reached for file {:?}",
245                file_path
246            )))
247        });
248
249        Ok(Some(usize::cast_from(size_bytes)))
250    }
251
252    async fn restore(&self, key: &str) -> Result<(), ExternalError> {
253        let file_path = self.blob_path(&FileBlob::replace_forward_slashes(key));
254
255        if fs::try_exists(&file_path).await? {
256            return Ok(());
257        }
258
259        let mut tombstone_name = file_path.clone();
260        assert_eq!(tombstone_name.extension(), None);
261        tombstone_name.set_extension("bak");
262        match fs::rename(&tombstone_name, &file_path).await {
263            Err(e) if e.kind() == ErrorKind::NotFound => {
264                // TODO: should we treat not-found as determinate elsewhere also?
265                Err(Determinate::new(anyhow!("no tombstone or file for key: {key}")).into())
266            }
267            other => Ok(other?),
268        }
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use crate::location::tests::blob_impl_test;
275
276    use super::*;
277
278    #[mz_ore::test(tokio::test)]
279    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
280    async fn file_blob() -> Result<(), ExternalError> {
281        let temp_dir = tempfile::tempdir().map_err(Error::from)?;
282        blob_impl_test(move |path| {
283            let instance_dir = temp_dir.path().join(path);
284            FileBlob::open(instance_dir.into())
285        })
286        .await?;
287
288        let temp_dir = tempfile::tempdir().map_err(Error::from)?;
289        blob_impl_test(move |path| {
290            let instance_dir = temp_dir.path().join(path);
291            let mut cfg: FileBlobConfig = instance_dir.into();
292            cfg.tombstone = true;
293            FileBlob::open(cfg)
294        })
295        .await?;
296
297        Ok(())
298    }
299}