1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
use anyhow::{bail, Context};
use aws_sdk_kinesis::types::{Blob, SdkError};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use mz_ore::retry::Retry;
use crate::action::{ControlFlow, State};
use crate::parser::BuiltinCommand;
pub async fn run_ingest(
mut cmd: BuiltinCommand,
state: &mut State,
) -> Result<ControlFlow, anyhow::Error> {
let stream_prefix = format!("testdrive-{}", cmd.args.string("stream")?);
match cmd.args.string("format")?.as_str() {
"bytes" => (),
f => bail!("unsupported message format for Kinesis: {}", f),
}
cmd.args.done()?;
let stream_name = format!("{}-{}", stream_prefix, state.seed);
for row in cmd.input {
let random_partition_key: String = thread_rng()
.sample_iter(&Alphanumeric)
.take(30)
.map(char::from)
.collect();
Retry::default()
.max_duration(state.default_timeout)
.retry_async_canceling(|_| async {
match state
.kinesis_client
.put_record()
.data(Blob::new(row.as_bytes()))
.partition_key(&random_partition_key)
.stream_name(&stream_name)
.send()
.await
{
Ok(_output) => Ok(()),
Err(SdkError::ServiceError(err))
if err.err().is_resource_not_found_exception() =>
{
bail!("resource not found: {}", SdkError::ServiceError(err))
}
Err(err) => Err(err).context("putting Kinesis record"),
}
})
.await
.context("putting Kinesis record")?;
}
Ok(ControlFlow::Continue)
}