1use anyhow::{Context, anyhow};
13use async_trait::async_trait;
14use azure_core::{ExponentialRetryOptions, RetryOptions, StatusCode, TransportOptions};
15use azure_identity::create_default_credential;
16use azure_storage::{CloudLocation, EMULATOR_ACCOUNT, prelude::*};
17use azure_storage_blobs::blob::operations::GetBlobResponse;
18use azure_storage_blobs::prelude::*;
19use bytes::Bytes;
20use futures_util::StreamExt;
21use futures_util::stream::FuturesOrdered;
22use std::fmt::{Debug, Formatter};
23use std::sync::Arc;
24use std::time::Duration;
25use tracing::{info, warn};
26use url::Url;
27use uuid::Uuid;
28
29use mz_ore::bytes::SegmentedBytes;
30use mz_ore::cast::CastFrom;
31use mz_ore::metrics::MetricsRegistry;
32
33use crate::cfg::BlobKnobs;
34use crate::error::Error;
35use crate::location::{Blob, BlobMetadata, Determinate, ExternalError};
36use crate::metrics::S3BlobMetrics;
37
38#[derive(Clone, Debug)]
40pub struct AzureBlobConfig {
41 metrics: S3BlobMetrics,
42 client: ContainerClient,
43 prefix: String,
44}
45
46impl AzureBlobConfig {
47 const EXTERNAL_TESTS_AZURE_CONTAINER: &'static str =
48 "MZ_PERSIST_EXTERNAL_STORAGE_TEST_AZURE_CONTAINER";
49
50 pub fn new(
55 account: String,
56 container: String,
57 prefix: String,
58 metrics: S3BlobMetrics,
59 url: Url,
60 knobs: Box<dyn BlobKnobs>,
61 ) -> Result<Self, Error> {
62 let transport = TransportOptions::new(Arc::new(
63 reqwest::ClientBuilder::new()
64 .timeout(knobs.operation_attempt_timeout())
65 .read_timeout(knobs.read_timeout())
66 .connect_timeout(knobs.connect_timeout())
67 .build()
68 .expect("valid config for azure HTTP client"),
69 ));
70 let retry = RetryOptions::exponential(
71 ExponentialRetryOptions::default().max_total_elapsed(knobs.operation_timeout()),
72 );
73
74 let client = if account == EMULATOR_ACCOUNT {
75 info!("Connecting to Azure emulator");
76 ClientBuilder::with_location(
77 CloudLocation::Emulator {
78 address: url.domain().expect("domain for Azure emulator").to_string(),
79 port: url.port().expect("port for Azure emulator"),
80 },
81 StorageCredentials::emulator(),
82 )
83 } else {
84 let sas_credentials = match url.query() {
85 Some(query) => Some(StorageCredentials::sas_token(query)),
86 None => None,
87 };
88
89 let credentials = match sas_credentials {
90 Some(Ok(credentials)) => credentials,
91 Some(Err(err)) => {
92 warn!("Failed to parse SAS token: {err}");
93 StorageCredentials::token_credential(
96 create_default_credential().expect("Azure default credentials"),
97 )
98 }
99 None => StorageCredentials::token_credential(
100 create_default_credential().expect("Azure default credentials"),
101 ),
102 };
103
104 ClientBuilder::new(account, credentials)
105 }
106 .transport(transport)
107 .retry(retry)
108 .blob_service_client()
109 .container_client(container);
110
111 Ok(AzureBlobConfig {
116 metrics,
117 client,
118 prefix,
119 })
120 }
121
122 pub fn new_for_test() -> Result<Option<Self>, Error> {
124 struct TestBlobKnobs;
125 impl Debug for TestBlobKnobs {
126 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
127 f.debug_struct("TestBlobKnobs").finish_non_exhaustive()
128 }
129 }
130 impl BlobKnobs for TestBlobKnobs {
131 fn operation_timeout(&self) -> Duration {
132 Duration::from_secs(30)
133 }
134
135 fn operation_attempt_timeout(&self) -> Duration {
136 Duration::from_secs(10)
137 }
138
139 fn connect_timeout(&self) -> Duration {
140 Duration::from_secs(5)
141 }
142
143 fn read_timeout(&self) -> Duration {
144 Duration::from_secs(5)
145 }
146
147 fn is_cc_active(&self) -> bool {
148 false
149 }
150 }
151
152 let container_name = match std::env::var(Self::EXTERNAL_TESTS_AZURE_CONTAINER) {
153 Ok(container) => container,
154 Err(_) => {
155 assert!(
156 !mz_ore::env::is_var_truthy("CI"),
157 "CI is supposed to run this test but something has gone wrong!"
158 );
159 return Ok(None);
160 }
161 };
162
163 let prefix = Uuid::new_v4().to_string();
164 let metrics = S3BlobMetrics::new(&MetricsRegistry::new());
165
166 let config = AzureBlobConfig::new(
167 EMULATOR_ACCOUNT.to_string(),
168 container_name.clone(),
169 prefix,
170 metrics,
171 Url::parse(&format!("http://localhost:40111/{}", container_name)).expect("valid url"),
172 Box::new(TestBlobKnobs),
173 )?;
174
175 Ok(Some(config))
176 }
177}
178
179#[derive(Debug)]
181pub struct AzureBlob {
182 metrics: S3BlobMetrics,
183 client: ContainerClient,
184 prefix: String,
185}
186
187impl AzureBlob {
188 pub async fn open(config: AzureBlobConfig) -> Result<Self, ExternalError> {
190 if config.client.service_client().account() == EMULATOR_ACCOUNT {
191 if let Err(error) = config.client.create().await {
195 info!(
196 ?error,
197 "failed to create emulator container; this is expected on repeat runs"
198 );
199 }
200 }
201
202 let ret = AzureBlob {
203 metrics: config.metrics,
204 client: config.client,
205 prefix: config.prefix,
206 };
207
208 Ok(ret)
209 }
210
211 fn get_path(&self, key: &str) -> String {
212 format!("{}/{}", self.prefix, key)
213 }
214}
215
216#[async_trait]
217impl Blob for AzureBlob {
218 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
219 let path = self.get_path(key);
220 let blob = self.client.blob_client(path);
221
222 async fn fetch_chunk(
224 response: GetBlobResponse,
225 metrics: S3BlobMetrics,
226 ) -> Result<Vec<Bytes>, ExternalError> {
227 let content_length = response.blob.properties.content_length;
228
229 let mut parts: Vec<Bytes> = Vec::new();
230 let mut total_len: u64 = 0;
231 let mut body = response.data;
232 while let Some(value) = body.next().await {
233 let value = value
234 .map_err(|e| ExternalError::from(e.context("azure blob get body error")))?;
235 total_len += u64::cast_from(value.len());
236 parts.push(value);
237 }
238
239 if content_length != total_len {
242 metrics.get_invalid_resp.inc();
243 }
244
245 Ok(parts)
246 }
247
248 let mut requests = FuturesOrdered::new();
249 let mut stream = blob.get().into_stream();
253
254 while let Some(value) = stream.next().await {
255 let response = match value {
257 Ok(v) => v,
258 Err(e) => {
259 if let Some(e) = e.as_http_error() {
260 if e.status() == StatusCode::NotFound {
261 return Ok(None);
262 }
263 }
264
265 return Err(ExternalError::from(e.context("azure blob get error")));
266 }
267 };
268
269 let metrics = self.metrics.clone();
271 requests.push_back(fetch_chunk(response, metrics));
272 }
273
274 let mut segments = SegmentedBytes::with_capacity(requests.len());
276 while let Some(body) = requests.next().await {
277 for part in body.context("azure blob get body err")? {
278 segments.push(part);
279 }
280 }
281
282 Ok(Some(segments))
283 }
284
285 async fn list_keys_and_metadata(
286 &self,
287 key_prefix: &str,
288 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
289 ) -> Result<(), ExternalError> {
290 let blob_key_prefix = self.get_path(key_prefix);
291 let strippable_root_prefix = format!("{}/", self.prefix);
292
293 let mut stream = self
294 .client
295 .list_blobs()
296 .prefix(blob_key_prefix.clone())
297 .into_stream();
298
299 while let Some(response) = stream.next().await {
300 let response =
301 response.map_err(|e| ExternalError::from(e.context("azure blob list error")))?;
302
303 for blob in response.blobs.items {
304 let azure_storage_blobs::container::operations::list_blobs::BlobItem::Blob(blob) =
305 blob
306 else {
307 continue;
308 };
309
310 if let Some(key) = blob.name.strip_prefix(&strippable_root_prefix) {
311 let size_in_bytes = blob.properties.content_length;
312 f(BlobMetadata { key, size_in_bytes });
313 }
314 }
315 }
316
317 Ok(())
318 }
319
320 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
321 let path = self.get_path(key);
322 let blob = self.client.blob_client(path);
323
324 blob.put_block_blob(value)
325 .await
326 .map_err(|e| ExternalError::from(e.context("azure blob put error")))?;
327
328 Ok(())
329 }
330
331 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
332 let path = self.get_path(key);
333 let blob = self.client.blob_client(path);
334
335 match blob.get_properties().await {
336 Ok(props) => {
337 let size = usize::cast_from(props.blob.properties.content_length);
338 blob.delete()
339 .await
340 .map_err(|e| ExternalError::from(e.context("azure blob delete error")))?;
341 Ok(Some(size))
342 }
343 Err(e) => {
344 if let Some(e) = e.as_http_error() {
345 if e.status() == StatusCode::NotFound {
346 return Ok(None);
347 }
348 }
349
350 Err(ExternalError::from(e.context("azure blob error")))
351 }
352 }
353 }
354
355 async fn restore(&self, key: &str) -> Result<(), ExternalError> {
356 let path = self.get_path(key);
357 let blob = self.client.blob_client(&path);
358
359 match blob.get_properties().await {
360 Ok(_) => Ok(()),
361 Err(e) => {
362 if let Some(e) = e.as_http_error() {
363 if e.status() == StatusCode::NotFound {
364 return Err(Determinate::new(anyhow!(
365 "azure blob error: unable to restore non-existent key {key}"
366 ))
367 .into());
368 }
369 }
370
371 Err(ExternalError::from(e.context("azure blob error")))
372 }
373 }
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use tracing::info;
380
381 use crate::location::tests::blob_impl_test;
382
383 use super::*;
384
385 #[cfg_attr(miri, ignore)] #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
387 async fn azure_blob() -> Result<(), ExternalError> {
388 let config = match AzureBlobConfig::new_for_test()? {
389 Some(client) => client,
390 None => {
391 info!(
392 "{} env not set: skipping test that uses external service",
393 AzureBlobConfig::EXTERNAL_TESTS_AZURE_CONTAINER
394 );
395 return Ok(());
396 }
397 };
398
399 blob_impl_test(move |_path| {
400 let config = config.clone();
401 async move {
402 let config = AzureBlobConfig {
403 metrics: config.metrics.clone(),
404 client: config.client.clone(),
405 prefix: config.prefix.clone(),
406 };
407 AzureBlob::open(config).await
408 }
409 })
410 .await
411 }
412}