1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use anyhow::{anyhow, Context};
use log::debug;
use rusoto_core::HttpClient;
use rusoto_credential::{AutoRefreshingProvider, AwsCredentials, ChainProvider, StaticProvider};
use rusoto_kinesis::KinesisClient;
use rusoto_s3::S3Client;
use rusoto_sqs::SqsClient;
use crate::aws::ConnectInfo;
pub(crate) fn http() -> Result<HttpClient<mz_http_proxy::hyper::Connector>, anyhow::Error> {
Ok(HttpClient::from_connector(
mz_http_proxy::hyper::connector().map_err(|e| anyhow!(e))?,
))
}
macro_rules! gen_client_builder(
($client:ident, $name:ident) => {
gen_client_builder!($client, $name, stringify!($client));
};
($client:ident, $name:ident, $client_name:expr) => {
#[doc = "Construct a "]
#[doc = $client_name]
#[doc = "
If statically provided connection information information is not provided,
falls back to using credentials gathered by rusoto's [`ChainProvider`]
wrapped in an [`AutoRefreshingProvider`].
The [`AutoRefreshingProvider`] caches the underlying provider's AWS credentials,
automatically fetching updated credentials if they've expired.
"]
pub fn $name(conn_info: ConnectInfo) -> Result<$client, anyhow::Error> {
let request_dispatcher = http().context(
concat!("creating HTTP client for ", $client_name))?;
let the_client = if let Some(creds) = conn_info.credentials {
debug!(concat!("Creating a new ", $client_name, " from provided access_key and secret_access_key"));
let provider = StaticProvider::from(AwsCredentials::from(creds));
$client::new_with(request_dispatcher, provider, conn_info.region)
} else {
debug!(
concat!("AWS access_key_id and secret_access_key not provided, \
creating a new ", $client_name, " using a chain provider.")
);
let mut provider = ChainProvider::new();
provider.set_timeout($crate::aws::AUTH_TIMEOUT);
let provider =
AutoRefreshingProvider::new(provider).context(
concat!("generating AWS credentials refreshing provider for ", $client_name))?;
$client::new_with(request_dispatcher, provider, conn_info.region)
};
Ok(the_client)
}
};
);
gen_client_builder!(S3Client, s3);
gen_client_builder!(SqsClient, sqs);
gen_client_builder!(KinesisClient, kinesis);