azure_storage_blobs/clients/
blob_client.rs

1use crate::{blob::operations::*, prelude::*};
2use azure_core::{
3    error::{Error, ErrorKind},
4    headers::Headers,
5    prelude::*,
6    Body, Method, Request, Response, StatusCode, Url,
7};
8use azure_storage::{
9    prelude::*,
10    shared_access_signature::service_sas::{BlobSharedAccessSignature, UserDeligationKey},
11    StorageCredentialsInner,
12};
13use futures::StreamExt;
14use std::ops::Deref;
15use time::OffsetDateTime;
16
17/// A client for handling blobs
18///
19/// For a full list of operations available on blobs, check out [the Azure documentation](https://docs.microsoft.com/en-us/rest/api/storageservices/operations-on-blobs).
20#[derive(Debug, Clone)]
21pub struct BlobClient {
22    container_client: ContainerClient,
23    blob_name: String,
24}
25
26impl BlobClient {
27    pub(crate) fn new(container_client: ContainerClient, blob_name: String) -> Self {
28        Self {
29            container_client,
30            blob_name,
31        }
32    }
33
34    pub fn from_sas_url(url: &Url) -> azure_core::Result<Self> {
35        let container_client = ContainerClient::from_sas_url(url)?;
36        // TODO: this currently only works for cloud locations Public and China
37        let path: Vec<_> = url.path().split_terminator('/').skip(2).collect();
38        if path.is_empty() {
39            Err(azure_core::Error::with_message(
40                azure_core::error::ErrorKind::DataConversion,
41                || "unable to find blob path",
42            ))
43        } else {
44            let path = path.join("/");
45            Ok(container_client.blob_client(path))
46        }
47    }
48
49    /// Stream a blob in chunks.
50    ///
51    /// By default, blobs are downloaded in 1MB chunks to reduce the impact of
52    /// intermittent network issues while downloading large blobs.
53    pub fn get(&self) -> GetBlobBuilder {
54        GetBlobBuilder::new(self.clone())
55    }
56
57    /// Return an entire blob.
58    pub async fn get_content(&self) -> azure_core::Result<Vec<u8>> {
59        let mut blob = Vec::new();
60        // NOTE: this uses the default chunk size of 1MB, which enables the
61        // pipeline to handle intermitent connection failures with retry, rather
62        // than restarting the whole blob on a failure.
63        let mut stream = self.get().into_stream();
64        while let Some(value) = stream.next().await {
65            let data = value?.data.collect().await?;
66            blob.extend(&data);
67        }
68        Ok(blob)
69    }
70
71    /// Get all user-defined metadata, standard HTTP properties, and system properties for the blob.
72    pub fn get_properties(&self) -> GetPropertiesBuilder {
73        GetPropertiesBuilder::new(self.clone())
74    }
75
76    /// Set blob properties.
77    ///
78    /// Several properties are cleared from the blob if not passed.
79    /// Consider calling `set_from_blob_properties` with existing blob properties.
80    pub fn set_properties(&self) -> SetPropertiesBuilder {
81        SetPropertiesBuilder::new(self.clone())
82    }
83
84    /// Get all user-defined metadata for the blob.
85    pub fn get_metadata(&self) -> GetMetadataBuilder {
86        GetMetadataBuilder::new(self.clone())
87    }
88
89    /// Set all user-defined metadata of the blob
90    pub fn set_metadata(&self) -> SetMetadataBuilder {
91        SetMetadataBuilder::new(self.clone())
92    }
93
94    /// Set the access tier on the blob.
95    pub fn set_blob_tier(&self, access_tier: AccessTier) -> SetBlobTierBuilder {
96        SetBlobTierBuilder::new(self.clone(), access_tier)
97    }
98
99    /// Set an expiry time on an existing blob.
100    ///
101    /// This operation is only allowed on Hierarchical Namespace enabled
102    /// accounts.
103    ///
104    /// ref: <https://docs.microsoft.com/en-us/rest/api/storageservices/set-blob-expiry>
105    pub fn set_blob_expiry(&self, blob_expiry: BlobExpiry) -> SetBlobExpiryBuilder {
106        SetBlobExpiryBuilder::new(self.clone(), blob_expiry)
107    }
108
109    /// Creates a new page blob.
110    pub fn put_page_blob(&self, length: u128) -> PutPageBlobBuilder {
111        PutPageBlobBuilder::new(self.clone(), length)
112    }
113
114    /// Creates a new append blob.
115    pub fn put_append_blob(&self) -> PutAppendBlobBuilder {
116        PutAppendBlobBuilder::new(self.clone())
117    }
118
119    /// Creates a new block blob, or update the content of an existing block blob.
120    pub fn put_block_blob(&self, body: impl Into<Body>) -> PutBlockBlobBuilder {
121        PutBlockBlobBuilder::new(self.clone(), body.into())
122    }
123
124    /// Copy the blob to a destination within the storage account.
125    pub fn copy(&self, copy_source: Url) -> CopyBlobBuilder {
126        CopyBlobBuilder::new(self.clone(), copy_source)
127    }
128
129    /// Copy the blob to a destination within the storage account synchronously.
130    pub fn copy_from_url(&self, copy_source: Url) -> CopyBlobFromUrlBuilder {
131        CopyBlobFromUrlBuilder::new(self.clone(), copy_source)
132    }
133
134    /// Create a lease on the blob to lock for write and delete operations.
135    pub fn acquire_lease<LD: Into<LeaseDuration>>(
136        &self,
137        lease_duration: LD,
138    ) -> AcquireLeaseBuilder {
139        AcquireLeaseBuilder::new(self.clone(), lease_duration.into())
140    }
141
142    /// End the lease but ensure that another client cannot acquire a new lease until the current lease period has expired.
143    pub fn break_lease(&self) -> BreakLeaseBuilder {
144        BreakLeaseBuilder::new(self.clone())
145    }
146
147    /// Delete the blob.
148    pub fn delete(&self) -> DeleteBlobBuilder {
149        DeleteBlobBuilder::new(self.clone())
150    }
151
152    /// Delete a snapshot of the blob.
153    pub fn delete_snapshot(&self, snapshot: Snapshot) -> DeleteBlobSnapshotBuilder {
154        DeleteBlobSnapshotBuilder::new(self.clone(), snapshot)
155    }
156
157    /// Delete the blob at a specific version.
158    pub fn delete_version_id(&self, version_id: VersionId) -> DeleteBlobVersionBuilder {
159        DeleteBlobVersionBuilder::new(self.clone(), version_id)
160    }
161
162    /* Operations specific to certain blob types */
163
164    /// Creates a new block to be committed as part of a block blob.
165    pub fn put_block(
166        &self,
167        block_id: impl Into<BlockId>,
168        body: impl Into<Body>,
169    ) -> PutBlockBuilder {
170        PutBlockBuilder::new(self.clone(), block_id.into(), body.into())
171    }
172
173    /// Creates a new block to be committed as part of a block blob, from a URL.
174    pub fn put_block_url(
175        &self,
176        block_id: impl Into<BlockId>,
177        copy_source: Url,
178    ) -> PutBlockUrlBuilder {
179        PutBlockUrlBuilder::new(self.clone(), block_id.into(), copy_source)
180    }
181
182    /// Retrieve the list of blocks that have been uploaded as part of a block blob.
183    pub fn get_block_list(&self) -> GetBlockListBuilder {
184        GetBlockListBuilder::new(self.clone())
185    }
186
187    /// Retrieve the user-defined tags for the specified blob, version, or snapshot.
188    pub fn get_tags(&self) -> GetTagsBuilder {
189        GetTagsBuilder::new(self.clone())
190    }
191
192    /// Set user-defined tags for the specified blob, version, or snapshot.
193    pub fn set_tags(&self, tags: impl Into<Tags>) -> SetTagsBuilder {
194        SetTagsBuilder::new(self.clone(), tags.into())
195    }
196
197    /// Write a block blob by specifying the list of block IDs that make up the blob.
198    ///
199    /// In order to be written as part of a blob, a block must have been successfully written to the server in a prior Put Block operation.
200    pub fn put_block_list(&self, block_list: BlockList) -> PutBlockListBuilder {
201        PutBlockListBuilder::new(self.clone(), block_list)
202    }
203
204    /// Write a range of pages to a page blob.
205    pub fn put_page(&self, ba512_range: BA512Range, content: impl Into<Body>) -> PutPageBuilder {
206        PutPageBuilder::new(self.clone(), ba512_range, content.into())
207    }
208
209    /// Return the list of valid page ranges for a page blob or snapshot of a page blob.
210    pub fn get_page_ranges(&self) -> GetPageRangesBuilder {
211        GetPageRangesBuilder::new(self.clone())
212    }
213
214    /// Commits a new block of data to the end of an existing append blob.
215    pub fn append_block(&self, body: impl Into<Body>) -> AppendBlockBuilder {
216        AppendBlockBuilder::new(self.clone(), body.into())
217    }
218
219    /// Clear range of pages in a page blob.
220    pub fn clear_page(&self, ba512_range: BA512Range) -> ClearPageBuilder {
221        ClearPageBuilder::new(self.clone(), ba512_range)
222    }
223
224    pub async fn user_delegation_shared_access_signature(
225        &self,
226        permissions: BlobSasPermissions,
227        user_delegation_key: &UserDeligationKey,
228    ) -> azure_core::Result<BlobSharedAccessSignature> {
229        let creds = self.container_client.credentials().0.read().await;
230        if !matches!(creds.deref(), StorageCredentialsInner::TokenCredential(_)) {
231            return Err(Error::message(
232                ErrorKind::Credential,
233                "User delegation access signature generation requires Token authentication",
234            ));
235        };
236
237        let service_client = self.container_client().service_client();
238
239        let account = service_client.account();
240
241        let canonicalized_resource = format!(
242            "/blob/{}/{}/{}",
243            account,
244            self.container_client.container_name(),
245            self.blob_name()
246        );
247        Ok(BlobSharedAccessSignature::new(
248            user_delegation_key.clone(),
249            canonicalized_resource,
250            permissions,
251            user_delegation_key.signed_expiry,
252            BlobSignedResource::Blob,
253        ))
254    }
255
256    /// Create a shared access signature.
257    pub async fn shared_access_signature(
258        &self,
259        permissions: BlobSasPermissions,
260        expiry: OffsetDateTime,
261    ) -> azure_core::Result<BlobSharedAccessSignature> {
262        let creds = self.container_client.credentials().0.read().await;
263        let StorageCredentialsInner::Key(account, key) = creds.deref() else {
264            return Err(Error::message(
265                ErrorKind::Credential,
266                "Shared access signature generation - SAS can be generated with access_key clients",
267            ));
268        };
269
270        let canonicalized_resource = format!(
271            "/blob/{}/{}/{}",
272            account,
273            self.container_client.container_name(),
274            self.blob_name()
275        );
276        Ok(BlobSharedAccessSignature::new(
277            key.clone(),
278            canonicalized_resource,
279            permissions,
280            expiry,
281            BlobSignedResource::Blob,
282        ))
283    }
284
285    /// Create a signed blob url
286    pub fn generate_signed_blob_url<T>(&self, signature: &T) -> azure_core::Result<Url>
287    where
288        T: SasToken,
289    {
290        let mut url = self.url()?;
291        url.set_query(Some(&signature.token()?));
292        Ok(url)
293    }
294
295    /// Check whether blob exists.
296    pub async fn exists(&self) -> azure_core::Result<bool> {
297        match self.get_properties().await {
298            Ok(_) => Ok(true),
299            Err(err)
300                if err
301                    .as_http_error()
302                    .map(|e| e.status() == StatusCode::NotFound)
303                    .unwrap_or_default() =>
304            {
305                Ok(false)
306            }
307            Err(err) => Err(err),
308        }
309    }
310
311    /// Create a blob snapshot
312    pub fn snapshot(&self) -> SnapshotBlobBuilder {
313        SnapshotBlobBuilder::new(self.clone())
314    }
315
316    pub fn blob_name(&self) -> &str {
317        &self.blob_name
318    }
319
320    /// Turn into a `BlobLeaseClient`
321    pub fn blob_lease_client(&self, lease_id: LeaseId) -> BlobLeaseClient {
322        BlobLeaseClient::new(self.clone(), lease_id)
323    }
324
325    pub fn container_client(&self) -> &ContainerClient {
326        &self.container_client
327    }
328
329    /// Full URL for the blob.
330    pub fn url(&self) -> azure_core::Result<Url> {
331        let mut url = self.container_client().url()?;
332        let parts = self.blob_name().trim_matches('/').split('/');
333        url.path_segments_mut()
334            .map_err(|()| Error::message(ErrorKind::DataConversion, "Invalid url"))?
335            .extend(parts);
336        Ok(url)
337    }
338
339    pub(crate) fn finalize_request(
340        url: Url,
341        method: Method,
342        headers: Headers,
343        request_body: Option<Body>,
344    ) -> azure_core::Result<Request> {
345        ContainerClient::finalize_request(url, method, headers, request_body)
346    }
347
348    pub(crate) async fn send(
349        &self,
350        context: &mut Context,
351        request: &mut Request,
352    ) -> azure_core::Result<Response> {
353        self.container_client.send(context, request).await
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360
361    #[test]
362    fn test_from_url() -> azure_core::Result<()> {
363        let path = "my/complex/nested/path/here";
364        let container = "mycontainer";
365        let account = "accountname";
366
367        let example = format!("https://{account}.blob.core.windows.net/{container}/{path}?token=1");
368        let url = Url::parse(&example)?;
369        let blob_client = BlobClient::from_sas_url(&url)?;
370
371        assert_eq!(blob_client.blob_name(), path);
372        assert_eq!(blob_client.container_client().container_name(), container);
373
374        let creds = blob_client
375            .container_client
376            .credentials()
377            .0
378            .try_read()
379            .expect("creds should be unlocked at this point");
380        assert!(matches!(
381            creds.deref(),
382            StorageCredentialsInner::SASToken(_)
383        ));
384
385        let url = Url::parse("https://accountname.blob.core.windows.net/mycontainer/myblob")?;
386        let blob_client = BlobClient::from_sas_url(&url)?;
387        let creds = blob_client
388            .container_client
389            .credentials()
390            .0
391            .try_read()
392            .expect("creds should be unlocked at this point");
393        assert!(matches!(creds.deref(), StorageCredentialsInner::Anonymous));
394
395        let url = Url::parse("https://accountname.blob.core.windows.net/mycontainer?token=1")?;
396        assert!(BlobClient::from_sas_url(&url).is_err(), "missing path");
397
398        let url = Url::parse("https://accountname.blob.core.windows.net/?token=1")?;
399        assert!(BlobClient::from_sas_url(&url).is_err(), "missing container");
400
401        let example =
402            format!("https://{account}.blob.core.chinacloudapi.cn/{container}/{path}?token=1");
403        let url = Url::parse(&example)?;
404        let blob_client = BlobClient::from_sas_url(&url)?;
405
406        assert_eq!(blob_client.blob_name(), path);
407        assert_eq!(blob_client.container_client().container_name(), container);
408
409        Ok(())
410    }
411
412    struct FakeSas {
413        token: String,
414    }
415    impl SasToken for FakeSas {
416        fn token(&self) -> azure_core::Result<String> {
417            Ok(self.token.clone())
418        }
419    }
420
421    fn build_url(container_name: &str, blob_name: &str, sas: &FakeSas) -> Url {
422        let blob_client = ClientBuilder::emulator().blob_client(container_name, blob_name);
423        blob_client
424            .generate_signed_blob_url(sas)
425            .expect("build url failed")
426    }
427
428    #[test]
429    fn test_generate_url() {
430        let sas = FakeSas {
431            token: "fake_token".to_owned(),
432        };
433
434        let url = build_url("a", "b", &sas);
435        assert_eq!(
436            url.as_str(),
437            "http://127.0.0.1:10000/devstoreaccount1/a/b?fake_token"
438        );
439
440        let url = build_url("a", "b/c/d", &sas);
441        assert_eq!(
442            url.as_str(),
443            "http://127.0.0.1:10000/devstoreaccount1/a/b/c/d?fake_token"
444        );
445
446        let url = build_url("a", "/b/c/d", &sas);
447        assert_eq!(
448            url.as_str(),
449            "http://127.0.0.1:10000/devstoreaccount1/a/b/c/d?fake_token"
450        );
451
452        let url = build_url("a", "b/c/d/hi there", &sas);
453        assert_eq!(
454            url.as_str(),
455            "http://127.0.0.1:10000/devstoreaccount1/a/b/c/d/hi%20there?fake_token"
456        );
457    }
458}