mz_s3_datagen/
main.rs
1use std::{io, iter};
11
12use aws_sdk_s3::operation::create_bucket::CreateBucketError;
13use aws_sdk_s3::types::{BucketLocationConstraint, CreateBucketConfiguration};
14use clap::Parser;
15use futures::stream::{self, StreamExt, TryStreamExt};
16use mz_ore::cast::CastFrom;
17use mz_ore::cli::{self, CliConfig};
18use mz_ore::error::ErrorExt;
19use tracing::{Level, error, event, info};
20use tracing_subscriber::filter::EnvFilter;
21
22#[derive(Parser)]
24struct Args {
25 #[clap(short = 'l', long)]
27 line_bytes: usize,
28
29 #[clap(
31 short = 's',
32 long,
33 value_parser = parse_object_size,
34 )]
35 object_size: usize,
36
37 #[clap(short = 'c', long)]
39 object_count: usize,
40
41 #[clap(short = 'p', long)]
43 key_prefix: String,
44
45 #[clap(short = 'b', long)]
47 bucket: String,
48
49 #[clap(short = 'r', long, default_value = "us-east-1")]
51 region: String,
52
53 #[clap(long, default_value = "50")]
55 concurrent_copies: usize,
56
57 #[clap(long, value_name = "FILTER", default_value = "off")]
61 log_filter: String,
62}
63
64#[tokio::main]
65async fn main() {
66 if let Err(e) = run().await {
67 error!("{}", e.display_with_causes());
68 std::process::exit(1);
69 }
70}
71
72async fn run() -> anyhow::Result<()> {
73 let args: Args = cli::parse_args(CliConfig::default());
74
75 tracing_subscriber::fmt()
76 .with_env_filter(EnvFilter::from(args.log_filter))
77 .with_writer(io::stderr)
78 .init();
79
80 info!(
81 "starting up to create {} of data across {} objects in {}/{}",
82 bytefmt::format(u64::cast_from(args.object_size * args.object_count)),
83 args.object_count,
84 args.bucket,
85 args.key_prefix
86 );
87
88 let line = iter::repeat('A')
89 .take(args.line_bytes)
90 .chain(iter::once('\n'))
91 .collect::<String>();
92 let mut object_size = 0;
93 let line_size = line.len();
94 let object = iter::repeat(line)
95 .take_while(|_| {
96 object_size += line_size;
97 object_size < args.object_size
98 })
99 .collect::<String>();
100
101 let config = mz_aws_util::defaults().load().await;
102 let client = mz_aws_util::s3::new_client(&config);
103
104 let first_object_key = format!("{}{:>05}", args.key_prefix, 0);
105
106 let progressbar = indicatif::ProgressBar::new(u64::cast_from(args.object_count));
107
108 let bucket_config = match config.region().map(|r| r.as_ref()) {
109 None | Some("us-east-1") => None,
111 Some(r) => Some(
112 CreateBucketConfiguration::builder()
113 .location_constraint(BucketLocationConstraint::from(r))
114 .build(),
115 ),
116 };
117 client
118 .create_bucket()
119 .bucket(&args.bucket)
120 .set_create_bucket_configuration(bucket_config)
121 .send()
122 .await
123 .map(|_| info!("created s3 bucket {}", args.bucket))
124 .or_else(|e| match e.into_service_error() {
125 CreateBucketError::BucketAlreadyOwnedByYou(_) => {
126 event!(Level::INFO, bucket = %args.bucket, "reusing existing bucket");
127 Ok(())
128 }
129 e => Err(e),
130 })?;
131
132 let mut total_created = 0;
133 client
134 .put_object()
135 .bucket(&args.bucket)
136 .key(&first_object_key)
137 .body(object.into_bytes().into())
138 .send()
139 .await?;
140 total_created += 1;
141 progressbar.inc(1);
142
143 let copy_source = format!("{}/{}", args.bucket, first_object_key.clone());
144
145 let copy_reqs = (1..args.object_count).map(|i| {
146 client
147 .copy_object()
148 .bucket(&args.bucket)
149 .copy_source(©_source)
150 .key(format!("{}{:>05}", args.key_prefix, i))
151 .send()
152 });
153 let mut copy_reqs_stream = stream::iter(copy_reqs).buffer_unordered(args.concurrent_copies);
154 while let Some(_) = copy_reqs_stream.try_next().await? {
155 progressbar.inc(1);
156 total_created += 1;
157 }
158 drop(progressbar);
159
160 info!("created {} objects", total_created);
161 assert_eq!(total_created, args.object_count);
162
163 Ok(())
164}
165
166fn parse_object_size(s: &str) -> Result<usize, &'static str> {
167 bytefmt::parse(s).map(usize::cast_from)
168}