1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910use std::time::Duration;
1112use aws_sdk_s3::config::Builder;
13use aws_sdk_s3::presigning::PresigningConfig;
14use aws_types::sdk_config::SdkConfig;
15use bytes::Bytes;
1617pub use aws_sdk_s3::Client;
1819/// Creates a new client from an [SDK config](aws_types::sdk_config::SdkConfig)
20/// with Materialize-specific customizations.
21///
22/// Specifically, if the SDK config overrides the endpoint URL, the client
23/// will be configured to use path-style addressing, as custom AWS endpoints
24/// typically do not support virtual host-style addressing.
25pub fn new_client(sdk_config: &SdkConfig) -> Client {
26let conf = Builder::from(sdk_config)
27 .force_path_style(sdk_config.endpoint_url().is_some())
28 .build();
29 Client::from_conf(conf)
30}
3132/// Returns a default [`PresigningConfig`] for tests.
33pub fn new_presigned_config() -> PresigningConfig {
34// Expire the URL in 5 minutes.
35PresigningConfig::expires_in(Duration::from_secs(5 * 60)).expect("known valid")
36}
3738pub async fn list_bucket_path(
39 client: &Client,
40 bucket: &str,
41 prefix: &str,
42) -> Result<Option<Vec<String>>, anyhow::Error> {
43let res = client
44 .list_objects_v2()
45 .bucket(bucket)
46 .prefix(prefix)
47 .send()
48 .await?;
49 res.contents
50 .map(|objs| {
51 objs.into_iter()
52 .map(|obj| {
53 obj.key
54 .ok_or(anyhow::anyhow!("key not provided from list_objects_v2"))
55 })
56 .collect::<Result<Vec<String>, _>>()
57 })
58 .transpose()
59}
6061/// A wrapper around [`ByteStream`] that implements the [`futures::stream::Stream`] trait.
62///
63/// [`ByteStream`]: aws_smithy_types::byte_stream::ByteStream
64#[pin_project::pin_project]
65pub struct ByteStreamAdapter {
66#[pin]
67inner: aws_smithy_types::byte_stream::ByteStream,
68}
6970impl ByteStreamAdapter {
71pub fn new(bytes: aws_smithy_types::byte_stream::ByteStream) -> Self {
72 ByteStreamAdapter { inner: bytes }
73 }
74}
7576impl futures::stream::Stream for ByteStreamAdapter {
77type Item = Result<Bytes, aws_smithy_types::byte_stream::error::Error>;
7879fn poll_next(
80self: std::pin::Pin<&mut Self>,
81 cx: &mut std::task::Context<'_>,
82 ) -> std::task::Poll<Option<Self::Item>> {
83let this = self.project();
84 aws_smithy_types::byte_stream::ByteStream::poll_next(this.inner, cx)
85 }
86}