Skip to main content

mz_persist/
azure.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An Azure Blob Storage implementation of [Blob] storage.
11
12use anyhow::{Context, anyhow};
13use async_trait::async_trait;
14use azure_core::{ExponentialRetryOptions, RetryOptions, StatusCode, TransportOptions};
15use azure_identity::create_default_credential;
16use azure_storage::{CloudLocation, EMULATOR_ACCOUNT, prelude::*};
17use azure_storage_blobs::blob::operations::GetBlobResponse;
18use azure_storage_blobs::prelude::*;
19use bytes::Bytes;
20use futures_util::StreamExt;
21use futures_util::stream::FuturesOrdered;
22use std::fmt::{Debug, Formatter};
23use std::sync::Arc;
24use std::time::Duration;
25use tracing::{info, warn};
26use url::Url;
27use uuid::Uuid;
28
29use mz_ore::bytes::SegmentedBytes;
30use mz_ore::cast::CastFrom;
31use mz_ore::metrics::MetricsRegistry;
32
33use crate::cfg::BlobKnobs;
34use crate::error::Error;
35use crate::location::{Blob, BlobMetadata, Determinate, ExternalError};
36use crate::metrics::S3BlobMetrics;
37
38/// Configuration for opening an [AzureBlob].
39#[derive(Clone, Debug)]
40pub struct AzureBlobConfig {
41    metrics: S3BlobMetrics,
42    client: ContainerClient,
43    prefix: String,
44}
45
46impl AzureBlobConfig {
47    const EXTERNAL_TESTS_AZURE_CONTAINER: &'static str =
48        "MZ_PERSIST_EXTERNAL_STORAGE_TEST_AZURE_CONTAINER";
49
50    /// Returns a new [AzureBlobConfig] for use in production.
51    ///
52    /// Stores objects in the given container prepended with the (possibly empty)
53    /// prefix. Azure credentials must be available in the process or environment.
54    pub fn new(
55        account: String,
56        container: String,
57        prefix: String,
58        metrics: S3BlobMetrics,
59        url: Url,
60        knobs: Box<dyn BlobKnobs>,
61    ) -> Result<Self, Error> {
62        let transport = TransportOptions::new(Arc::new(
63            reqwest::ClientBuilder::new()
64                .timeout(knobs.operation_attempt_timeout())
65                .read_timeout(knobs.read_timeout())
66                .connect_timeout(knobs.connect_timeout())
67                .build()
68                .expect("valid config for azure HTTP client"),
69        ));
70        let retry = RetryOptions::exponential(
71            ExponentialRetryOptions::default().max_total_elapsed(knobs.operation_timeout()),
72        );
73
74        let client = if account == EMULATOR_ACCOUNT {
75            info!("Connecting to Azure emulator");
76            ClientBuilder::with_location(
77                CloudLocation::Emulator {
78                    address: url.domain().expect("domain for Azure emulator").to_string(),
79                    port: url.port().expect("port for Azure emulator"),
80                },
81                StorageCredentials::emulator(),
82            )
83        } else {
84            let sas_credentials = match url.query() {
85                Some(query) => Some(StorageCredentials::sas_token(query)),
86                None => None,
87            };
88
89            let credentials = match sas_credentials {
90                Some(Ok(credentials)) => credentials,
91                Some(Err(err)) => {
92                    warn!("Failed to parse SAS token: {err}");
93                    // TODO: should we fallback here? Or can we fully rely on query params
94                    // to determine whether a SAS token was provided?
95                    StorageCredentials::token_credential(
96                        create_default_credential().expect("Azure default credentials"),
97                    )
98                }
99                None => StorageCredentials::token_credential(
100                    create_default_credential().expect("Azure default credentials"),
101                ),
102            };
103
104            ClientBuilder::new(account, credentials)
105        }
106        .transport(transport)
107        .retry(retry)
108        .blob_service_client()
109        .container_client(container);
110
111        // TODO: some auth modes like user-delegated SAS tokens are time-limited
112        // and need to be refreshed. This can be done through `service_client.update_credentials`
113        // but there'll be a fair bit of plumbing needed to make each mode work
114
115        Ok(AzureBlobConfig {
116            metrics,
117            client,
118            prefix,
119        })
120    }
121
122    /// Returns a new [AzureBlobConfig] for use in unit tests.
123    pub fn new_for_test() -> Result<Option<Self>, Error> {
124        struct TestBlobKnobs;
125        impl Debug for TestBlobKnobs {
126            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
127                f.debug_struct("TestBlobKnobs").finish_non_exhaustive()
128            }
129        }
130        impl BlobKnobs for TestBlobKnobs {
131            fn operation_timeout(&self) -> Duration {
132                Duration::from_secs(30)
133            }
134
135            fn operation_attempt_timeout(&self) -> Duration {
136                Duration::from_secs(10)
137            }
138
139            fn connect_timeout(&self) -> Duration {
140                Duration::from_secs(5)
141            }
142
143            fn read_timeout(&self) -> Duration {
144                Duration::from_secs(5)
145            }
146
147            fn is_cc_active(&self) -> bool {
148                false
149            }
150        }
151
152        let container_name = match std::env::var(Self::EXTERNAL_TESTS_AZURE_CONTAINER) {
153            Ok(container) => container,
154            Err(_) => {
155                assert!(
156                    !mz_ore::env::is_var_truthy("CI"),
157                    "CI is supposed to run this test but something has gone wrong!"
158                );
159                return Ok(None);
160            }
161        };
162
163        let prefix = Uuid::new_v4().to_string();
164        let metrics = S3BlobMetrics::new(&MetricsRegistry::new());
165
166        let config = AzureBlobConfig::new(
167            EMULATOR_ACCOUNT.to_string(),
168            container_name.clone(),
169            prefix,
170            metrics,
171            Url::parse(&format!("http://localhost:40111/{}", container_name)).expect("valid url"),
172            Box::new(TestBlobKnobs),
173        )?;
174
175        Ok(Some(config))
176    }
177}
178
179/// Implementation of [Blob] backed by Azure Blob Storage.
180#[derive(Debug)]
181pub struct AzureBlob {
182    metrics: S3BlobMetrics,
183    client: ContainerClient,
184    prefix: String,
185}
186
187impl AzureBlob {
188    /// Opens the given location for non-exclusive read-write access.
189    pub async fn open(config: AzureBlobConfig) -> Result<Self, ExternalError> {
190        if config.client.service_client().account() == EMULATOR_ACCOUNT {
191            // TODO: we could move this logic into the test harness.
192            // it's currently here because it's surprisingly annoying to
193            // create the container out-of-band
194            if let Err(error) = config.client.create().await {
195                info!(
196                    ?error,
197                    "failed to create emulator container; this is expected on repeat runs"
198                );
199            }
200        }
201
202        let ret = AzureBlob {
203            metrics: config.metrics,
204            client: config.client,
205            prefix: config.prefix,
206        };
207
208        Ok(ret)
209    }
210
211    fn get_path(&self, key: &str) -> String {
212        format!("{}/{}", self.prefix, key)
213    }
214}
215
216#[async_trait]
217impl Blob for AzureBlob {
218    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
219        let path = self.get_path(key);
220        let blob = self.client.blob_client(path);
221
222        /// Fetch the body of a single [`GetBlobResponse`].
223        async fn fetch_chunk(
224            response: GetBlobResponse,
225            metrics: S3BlobMetrics,
226        ) -> Result<Vec<Bytes>, ExternalError> {
227            let content_length = response.blob.properties.content_length;
228
229            let mut parts: Vec<Bytes> = Vec::new();
230            let mut total_len: u64 = 0;
231            let mut body = response.data;
232            while let Some(value) = body.next().await {
233                let value = value
234                    .map_err(|e| ExternalError::from(e.context("azure blob get body error")))?;
235                total_len += u64::cast_from(value.len());
236                parts.push(value);
237            }
238
239            // Report if the content-length header didn't match the number of
240            // bytes we read from the network.
241            if content_length != total_len {
242                metrics.get_invalid_resp.inc();
243            }
244
245            Ok(parts)
246        }
247
248        let mut requests = FuturesOrdered::new();
249        // TODO: the default chunk size is 1MB. We have not tried tuning it,
250        // but making this configurable / running some benchmarks could be
251        // valuable.
252        let mut stream = blob.get().into_stream();
253
254        while let Some(value) = stream.next().await {
255            // Return early if any of the individual fetch requests return an error.
256            let response = match value {
257                Ok(v) => v,
258                Err(e) => {
259                    if let Some(e) = e.as_http_error() {
260                        if e.status() == StatusCode::NotFound {
261                            return Ok(None);
262                        }
263                    }
264
265                    return Err(ExternalError::from(e.context("azure blob get error")));
266                }
267            };
268
269            // Drive all of the fetch requests concurrently.
270            let metrics = self.metrics.clone();
271            requests.push_back(fetch_chunk(response, metrics));
272        }
273
274        // Await on all of our chunks.
275        let mut segments = SegmentedBytes::with_capacity(requests.len());
276        while let Some(body) = requests.next().await {
277            for part in body.context("azure blob get body err")? {
278                segments.push(part);
279            }
280        }
281
282        Ok(Some(segments))
283    }
284
285    async fn list_keys_and_metadata(
286        &self,
287        key_prefix: &str,
288        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
289    ) -> Result<(), ExternalError> {
290        let blob_key_prefix = self.get_path(key_prefix);
291        let strippable_root_prefix = format!("{}/", self.prefix);
292
293        let mut stream = self
294            .client
295            .list_blobs()
296            .prefix(blob_key_prefix.clone())
297            .into_stream();
298
299        while let Some(response) = stream.next().await {
300            let response =
301                response.map_err(|e| ExternalError::from(e.context("azure blob list error")))?;
302
303            for blob in response.blobs.items {
304                let azure_storage_blobs::container::operations::list_blobs::BlobItem::Blob(blob) =
305                    blob
306                else {
307                    continue;
308                };
309
310                if let Some(key) = blob.name.strip_prefix(&strippable_root_prefix) {
311                    let size_in_bytes = blob.properties.content_length;
312                    f(BlobMetadata { key, size_in_bytes });
313                }
314            }
315        }
316
317        Ok(())
318    }
319
320    async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
321        let path = self.get_path(key);
322        let blob = self.client.blob_client(path);
323
324        blob.put_block_blob(value)
325            .await
326            .map_err(|e| ExternalError::from(e.context("azure blob put error")))?;
327
328        Ok(())
329    }
330
331    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
332        let path = self.get_path(key);
333        let blob = self.client.blob_client(path);
334
335        match blob.get_properties().await {
336            Ok(props) => {
337                let size = usize::cast_from(props.blob.properties.content_length);
338                blob.delete()
339                    .await
340                    .map_err(|e| ExternalError::from(e.context("azure blob delete error")))?;
341                Ok(Some(size))
342            }
343            Err(e) => {
344                if let Some(e) = e.as_http_error() {
345                    if e.status() == StatusCode::NotFound {
346                        return Ok(None);
347                    }
348                }
349
350                Err(ExternalError::from(e.context("azure blob error")))
351            }
352        }
353    }
354
355    async fn restore(&self, key: &str) -> Result<(), ExternalError> {
356        let path = self.get_path(key);
357        let blob = self.client.blob_client(&path);
358
359        match blob.get_properties().await {
360            Ok(_) => Ok(()),
361            Err(e) => {
362                if let Some(e) = e.as_http_error() {
363                    if e.status() == StatusCode::NotFound {
364                        return Err(Determinate::new(anyhow!(
365                            "azure blob error: unable to restore non-existent key {key}"
366                        ))
367                        .into());
368                    }
369                }
370
371                Err(ExternalError::from(e.context("azure blob error")))
372            }
373        }
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use tracing::info;
380
381    use crate::location::tests::blob_impl_test;
382
383    use super::*;
384
385    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_method` on OS `linux`
386    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
387    async fn azure_blob() -> Result<(), ExternalError> {
388        let config = match AzureBlobConfig::new_for_test()? {
389            Some(client) => client,
390            None => {
391                info!(
392                    "{} env not set: skipping test that uses external service",
393                    AzureBlobConfig::EXTERNAL_TESTS_AZURE_CONTAINER
394                );
395                return Ok(());
396            }
397        };
398
399        blob_impl_test(move |_path| {
400            let config = config.clone();
401            async move {
402                let config = AzureBlobConfig {
403                    metrics: config.metrics.clone(),
404                    client: config.client.clone(),
405                    prefix: config.prefix.clone(),
406                };
407                AzureBlob::open(config).await
408            }
409        })
410        .await
411    }
412}