1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! AWS Kinesis client and utilities.

use aws_sdk_kinesis::error::{GetShardIteratorError, ListShardsError};
use aws_sdk_kinesis::model::{Shard, ShardIteratorType};
use aws_sdk_kinesis::types::SdkError;
use aws_sdk_kinesis::Client;
use aws_types::SdkConfig;

use crate::util;

/// Constructs a new AWS Kinesis client that respects the system proxy configuration.
pub fn client(config: &SdkConfig) -> Client {
    Client::from_conf_conn(config.into(), util::connector())
}

/// Lists the shards of the named Kinesis stream.
///
/// This function wraps the `ListShards` API call. It returns all shards in a
/// given Kinesis stream, automatically handling pagination if required.
///
/// # Errors
///
/// Any errors from the underlying `GetShardIterator` API call are surfaced
/// directly.
pub async fn list_shards(
    client: &Client,
    stream_name: &str,
) -> Result<Vec<Shard>, SdkError<ListShardsError>> {
    let mut next_token = None;
    let mut shards = Vec::new();
    loop {
        let res = client
            .list_shards()
            .set_next_token(next_token)
            .stream_name(stream_name)
            .send()
            .await?;
        shards.extend(res.shards.unwrap_or_else(Vec::new));
        if res.next_token.is_some() {
            next_token = res.next_token;
        } else {
            return Ok(shards);
        }
    }
}

/// Gets the shard IDs of the named Kinesis stream.
///
/// This function is like [`list_shards`], but
///
/// # Errors
///
/// Any errors from the underlying `GetShardIterator` API call are surfaced
/// directly.
pub async fn get_shard_ids(
    client: &Client,
    stream_name: &str,
) -> Result<impl Iterator<Item = String>, SdkError<ListShardsError>> {
    let res = list_shards(client, stream_name).await?;
    Ok(res
        .into_iter()
        .map(|s| s.shard_id.unwrap_or_else(|| "".into())))
}

/// Constructs an iterator over a Kinesis shard.
///
/// This function is a wrapper around around the `GetShardIterator` API. It
/// returns the `TRIM_HORIZON` shard iterator of a given stream and shard,
/// meaning it will return the location in the shard with the oldest data
/// record.
///
/// # Errors
///
/// Any errors from the underlying `GetShardIterator` API call are surfaced
/// directly.
pub async fn get_shard_iterator(
    client: &Client,
    stream_name: &str,
    shard_id: &str,
) -> Result<Option<String>, SdkError<GetShardIteratorError>> {
    let res = client
        .get_shard_iterator()
        .stream_name(stream_name)
        .shard_id(shard_id)
        .shard_iterator_type(ShardIteratorType::TrimHorizon)
        .send()
        .await?;
    Ok(res.shard_iterator)
}