mz_aws_util/
s3.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::time::Duration;
11
12use aws_sdk_s3::config::Builder;
13use aws_sdk_s3::presigning::PresigningConfig;
14use aws_types::sdk_config::SdkConfig;
15use bytes::Bytes;
16
17pub use aws_sdk_s3::Client;
18
19/// Creates a new client from an [SDK config](aws_types::sdk_config::SdkConfig)
20/// with Materialize-specific customizations.
21///
22/// Specifically, if the SDK config overrides the endpoint URL, the client
23/// will be configured to use path-style addressing, as custom AWS endpoints
24/// typically do not support virtual host-style addressing.
25pub fn new_client(sdk_config: &SdkConfig) -> Client {
26    let conf = Builder::from(sdk_config)
27        .force_path_style(sdk_config.endpoint_url().is_some())
28        .build();
29    Client::from_conf(conf)
30}
31
32/// Returns a default [`PresigningConfig`] for tests.
33pub fn new_presigned_config() -> PresigningConfig {
34    // Expire the URL in 5 minutes.
35    PresigningConfig::expires_in(Duration::from_secs(5 * 60)).expect("known valid")
36}
37
38pub async fn list_bucket_path(
39    client: &Client,
40    bucket: &str,
41    prefix: &str,
42) -> Result<Option<Vec<String>>, anyhow::Error> {
43    let res = client
44        .list_objects_v2()
45        .bucket(bucket)
46        .prefix(prefix)
47        .send()
48        .await?;
49    res.contents
50        .map(|objs| {
51            objs.into_iter()
52                .map(|obj| {
53                    obj.key
54                        .ok_or(anyhow::anyhow!("key not provided from list_objects_v2"))
55                })
56                .collect::<Result<Vec<String>, _>>()
57        })
58        .transpose()
59}
60
61/// A wrapper around [`ByteStream`] that implements the [`futures::stream::Stream`] trait.
62///
63/// [`ByteStream`]: aws_smithy_types::byte_stream::ByteStream
64#[pin_project::pin_project]
65pub struct ByteStreamAdapter {
66    #[pin]
67    inner: aws_smithy_types::byte_stream::ByteStream,
68}
69
70impl ByteStreamAdapter {
71    pub fn new(bytes: aws_smithy_types::byte_stream::ByteStream) -> Self {
72        ByteStreamAdapter { inner: bytes }
73    }
74}
75
76impl futures::stream::Stream for ByteStreamAdapter {
77    type Item = Result<Bytes, aws_smithy_types::byte_stream::error::Error>;
78
79    fn poll_next(
80        self: std::pin::Pin<&mut Self>,
81        cx: &mut std::task::Context<'_>,
82    ) -> std::task::Poll<Option<Self::Item>> {
83        let this = self.project();
84        aws_smithy_types::byte_stream::ByteStream::poll_next(this.inner, cx)
85    }
86}