mz_s3_datagen/
main.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::{io, iter};
11
12use aws_sdk_s3::operation::create_bucket::CreateBucketError;
13use aws_sdk_s3::types::{BucketLocationConstraint, CreateBucketConfiguration};
14use clap::Parser;
15use futures::stream::{self, StreamExt, TryStreamExt};
16use mz_ore::cast::CastFrom;
17use mz_ore::cli::{self, CliConfig};
18use mz_ore::error::ErrorExt;
19use tracing::{Level, error, event, info};
20use tracing_subscriber::filter::EnvFilter;
21
22/// Generate meaningless data in S3 to test download speeds
23#[derive(Parser)]
24struct Args {
25    /// How large to make each line (record) in Bytes
26    #[clap(short = 'l', long)]
27    line_bytes: usize,
28
29    /// How large to make each object, e.g. `1 KiB`
30    #[clap(
31        short = 's',
32        long,
33        value_parser = parse_object_size,
34    )]
35    object_size: usize,
36
37    /// How many objects to create
38    #[clap(short = 'c', long)]
39    object_count: usize,
40
41    /// All objects will be inserted into this prefix
42    #[clap(short = 'p', long)]
43    key_prefix: String,
44
45    /// All objects will be inserted into this bucket
46    #[clap(short = 'b', long)]
47    bucket: String,
48
49    /// Which region to operate in
50    #[clap(short = 'r', long, default_value = "us-east-1")]
51    region: String,
52
53    /// Number of copy operations to run concurrently
54    #[clap(long, default_value = "50")]
55    concurrent_copies: usize,
56
57    /// Which log messages to emit.
58    ///
59    /// See environmentd's `--log-filter` option for details.
60    #[clap(long, value_name = "FILTER", default_value = "off")]
61    log_filter: String,
62}
63
64#[tokio::main]
65async fn main() {
66    if let Err(e) = run().await {
67        error!("{}", e.display_with_causes());
68        std::process::exit(1);
69    }
70}
71
72async fn run() -> anyhow::Result<()> {
73    let args: Args = cli::parse_args(CliConfig::default());
74
75    tracing_subscriber::fmt()
76        .with_env_filter(EnvFilter::from(args.log_filter))
77        .with_writer(io::stderr)
78        .init();
79
80    info!(
81        "starting up to create {} of data across {} objects in {}/{}",
82        bytefmt::format(u64::cast_from(args.object_size * args.object_count)),
83        args.object_count,
84        args.bucket,
85        args.key_prefix
86    );
87
88    let line = iter::repeat('A')
89        .take(args.line_bytes)
90        .chain(iter::once('\n'))
91        .collect::<String>();
92    let mut object_size = 0;
93    let line_size = line.len();
94    let object = iter::repeat(line)
95        .take_while(|_| {
96            object_size += line_size;
97            object_size < args.object_size
98        })
99        .collect::<String>();
100
101    let config = mz_aws_util::defaults().load().await;
102    let client = mz_aws_util::s3::new_client(&config);
103
104    let first_object_key = format!("{}{:>05}", args.key_prefix, 0);
105
106    let progressbar = indicatif::ProgressBar::new(u64::cast_from(args.object_count));
107
108    let bucket_config = match config.region().map(|r| r.as_ref()) {
109        // us-east-1 is special and is not accepted as a location constraint.
110        None | Some("us-east-1") => None,
111        Some(r) => Some(
112            CreateBucketConfiguration::builder()
113                .location_constraint(BucketLocationConstraint::from(r))
114                .build(),
115        ),
116    };
117    client
118        .create_bucket()
119        .bucket(&args.bucket)
120        .set_create_bucket_configuration(bucket_config)
121        .send()
122        .await
123        .map(|_| info!("created s3 bucket {}", args.bucket))
124        .or_else(|e| match e.into_service_error() {
125            CreateBucketError::BucketAlreadyOwnedByYou(_) => {
126                event!(Level::INFO, bucket = %args.bucket, "reusing existing bucket");
127                Ok(())
128            }
129            e => Err(e),
130        })?;
131
132    let mut total_created = 0;
133    client
134        .put_object()
135        .bucket(&args.bucket)
136        .key(&first_object_key)
137        .body(object.into_bytes().into())
138        .send()
139        .await?;
140    total_created += 1;
141    progressbar.inc(1);
142
143    let copy_source = format!("{}/{}", args.bucket, first_object_key.clone());
144
145    let copy_reqs = (1..args.object_count).map(|i| {
146        client
147            .copy_object()
148            .bucket(&args.bucket)
149            .copy_source(&copy_source)
150            .key(format!("{}{:>05}", args.key_prefix, i))
151            .send()
152    });
153    let mut copy_reqs_stream = stream::iter(copy_reqs).buffer_unordered(args.concurrent_copies);
154    while let Some(_) = copy_reqs_stream.try_next().await? {
155        progressbar.inc(1);
156        total_created += 1;
157    }
158    drop(progressbar);
159
160    info!("created {} objects", total_created);
161    assert_eq!(total_created, args.object_count);
162
163    Ok(())
164}
165
166fn parse_object_size(s: &str) -> Result<usize, &'static str> {
167    bytefmt::parse(s).map(usize::cast_from)
168}