mz_storage_operators/s3_oneshot_sink/
pgcopy.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::anyhow;
11use aws_types::sdk_config::SdkConfig;
12use mz_aws_util::s3_uploader::{
13    CompletedUpload, S3MultiPartUploadError, S3MultiPartUploader, S3MultiPartUploaderConfig,
14};
15use mz_ore::assert_none;
16use mz_ore::cast::CastFrom;
17use mz_pgcopy::{CopyFormatParams, encode_copy_format, encode_copy_format_header};
18use mz_repr::{GlobalId, RelationDesc, Row};
19use mz_storage_types::sinks::s3_oneshot_sink::S3KeyManager;
20use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo};
21use tracing::info;
22
23use super::{CopyToParameters, CopyToS3Uploader};
24
25/// Required state to upload batches to S3
26pub(super) struct PgCopyUploader {
27    /// The output description.
28    desc: RelationDesc,
29    /// Params to format the data.
30    format: CopyFormatParams<'static>,
31    /// The index of the current file within the batch.
32    file_index: usize,
33    /// Provides the appropriate bucket and object keys to use for uploads
34    key_manager: S3KeyManager,
35    /// Identifies the batch that files uploaded by this uploader belong to
36    batch: u64,
37    /// The desired file size. A new file upload will be started
38    /// when the size exceeds this amount.
39    max_file_size: u64,
40    /// The aws sdk config.
41    /// This is an option so that we can get an owned value later to move to a
42    /// spawned tokio task.
43    sdk_config: Option<SdkConfig>,
44    /// Multi-part uploader for the current file.
45    /// Keeping the uploader in an `Option` to later take owned value.
46    current_file_uploader: Option<S3MultiPartUploader>,
47    /// Upload parameters.
48    params: CopyToParameters,
49}
50
51impl CopyToS3Uploader for PgCopyUploader {
52    fn new(
53        sdk_config: SdkConfig,
54        connection_details: S3UploadInfo,
55        sink_id: &GlobalId,
56        batch: u64,
57        params: CopyToParameters,
58    ) -> Result<PgCopyUploader, anyhow::Error> {
59        match connection_details.format {
60            S3SinkFormat::PgCopy(format_params) => Ok(PgCopyUploader {
61                desc: connection_details.desc,
62                sdk_config: Some(sdk_config),
63                format: format_params,
64                key_manager: S3KeyManager::new(sink_id, &connection_details.uri),
65                batch,
66                max_file_size: connection_details.max_file_size,
67                file_index: 0,
68                current_file_uploader: None,
69                params,
70            }),
71            _ => anyhow::bail!("Expected PgCopy format"),
72        }
73    }
74
75    /// Finishes any remaining in-progress upload.
76    async fn finish(&mut self) -> Result<(), anyhow::Error> {
77        if let Some(uploader) = self.current_file_uploader.take() {
78            // Moving the aws s3 calls onto tokio tasks instead of using timely runtime.
79            let handle =
80                mz_ore::task::spawn(|| "s3_uploader::finish", async { uploader.finish().await });
81            let CompletedUpload {
82                part_count,
83                total_bytes_uploaded,
84                bucket,
85                key,
86            } = handle.await?;
87            info!(
88                "finished upload: bucket {}, key {}, bytes_uploaded {}, parts_uploaded {}",
89                bucket, key, total_bytes_uploaded, part_count
90            );
91        }
92        Ok(())
93    }
94
95    /// Appends the row to the in-progress upload where it is buffered till it reaches the configured
96    /// `part_size_limit` after which the `S3MultiPartUploader` will upload that part. In case it will
97    /// exceed the max file size of the ongoing upload, then a new `S3MultiPartUploader` for a new file will
98    /// be created and the row data will be appended there.
99    async fn append_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
100        let mut buf: Vec<u8> = vec![];
101        // encode the row and write to temp buffer.
102        encode_copy_format(&self.format, row, self.desc.typ(), &mut buf)
103            .map_err(|_| anyhow!("error encoding row"))?;
104
105        if self.current_file_uploader.is_none() {
106            self.start_new_file_upload().await?;
107        }
108        let mut uploader = self.current_file_uploader.as_mut().expect("known exists");
109
110        match uploader.buffer_chunk(&buf) {
111            Ok(_) => Ok(()),
112            Err(S3MultiPartUploadError::UploadExceedsMaxFileLimit(_)) => {
113                // Start a multi part upload of next file.
114                self.start_new_file_upload().await?;
115                uploader = self.current_file_uploader.as_mut().expect("known exists");
116                uploader.buffer_chunk(&buf)?;
117                Ok(())
118            }
119            Err(e) => Err(e.into()),
120        }
121    }
122
123    async fn force_new_file(&mut self) -> Result<(), anyhow::Error> {
124        self.start_new_file_upload().await
125    }
126}
127
128impl PgCopyUploader {
129    /// Creates the uploader for the next file and starts the multi part upload.
130    async fn start_new_file_upload(&mut self) -> Result<(), anyhow::Error> {
131        self.finish().await?;
132        assert_none!(self.current_file_uploader);
133
134        self.file_index += 1;
135        let object_key =
136            self.key_manager
137                .data_key(self.batch, self.file_index, self.format.file_extension());
138        let bucket = self.key_manager.bucket.clone();
139        info!("starting upload: bucket {}, key {}", &bucket, &object_key);
140        let sdk_config = self
141            .sdk_config
142            .take()
143            .expect("sdk_config should always be present");
144        let max_file_size = self.max_file_size;
145        // Moving the aws s3 calls onto tokio tasks instead of using timely runtime.
146        let part_size_limit = u64::cast_from(self.params.s3_multipart_part_size_bytes);
147        let handle = mz_ore::task::spawn(|| "s3_uploader::try_new", async move {
148            let uploader = S3MultiPartUploader::try_new(
149                &sdk_config,
150                bucket,
151                object_key,
152                S3MultiPartUploaderConfig {
153                    part_size_limit,
154                    file_size_limit: max_file_size,
155                },
156            )
157            .await;
158            (uploader, sdk_config)
159        });
160        let (uploader, sdk_config) = handle.await;
161        self.sdk_config = Some(sdk_config);
162        let mut uploader = uploader?;
163        if self.format.requires_header() {
164            let mut buf: Vec<u8> = vec![];
165            encode_copy_format_header(&self.format, &self.desc, &mut buf)
166                .map_err(|_| anyhow!("error encoding header"))?;
167            uploader.buffer_chunk(&buf)?;
168        }
169        self.current_file_uploader = Some(uploader);
170        Ok(())
171    }
172}
173
174/// On CI, these tests are enabled by adding the scratch-aws-access plugin
175/// to the `cargo-test` step in `ci/test/pipeline.template.yml` and setting
176/// `MZ_S3_UPLOADER_TEST_S3_BUCKET` in
177/// `ci/test/cargo-test/mzcompose.py`.
178///
179/// For a Materialize developer, to opt in to these tests locally for
180/// development, follow the AWS access guide:
181///
182/// ```text
183/// https://www.notion.so/materialize/AWS-access-5fbd9513dcdc4e11a7591e8caa5f63fe
184/// ```
185///
186/// then running `source src/aws-util/src/setup_test_env_mz.sh`. You will also have
187/// to run `aws sso login` if you haven't recently.
188#[cfg(test)]
189mod tests {
190    use bytesize::ByteSize;
191    use mz_pgcopy::CopyFormatParams;
192    use mz_repr::{ColumnName, Datum, SqlColumnType, SqlRelationType};
193    use uuid::Uuid;
194
195    use super::*;
196
197    fn s3_bucket_path_for_test() -> Option<(String, String)> {
198        let bucket = match std::env::var("MZ_S3_UPLOADER_TEST_S3_BUCKET") {
199            Ok(bucket) => bucket,
200            Err(_) => {
201                if mz_ore::env::is_var_truthy("CI") {
202                    panic!("CI is supposed to run this test but something has gone wrong!");
203                }
204                return None;
205            }
206        };
207
208        let prefix = Uuid::new_v4().to_string();
209        let path = format!("cargo_test/{}/file", prefix);
210        Some((bucket, path))
211    }
212
213    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
214    #[cfg_attr(coverage, ignore)] // https://github.com/MaterializeInc/database-issues/issues/5586
215    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_method` on OS `linux`
216    #[ignore] // TODO: Reenable against minio so it can run locally
217    async fn test_multiple_files() -> Result<(), anyhow::Error> {
218        let sdk_config = mz_aws_util::defaults().load().await;
219        let (bucket, path) = match s3_bucket_path_for_test() {
220            Some(tuple) => tuple,
221            None => return Ok(()),
222        };
223        let sink_id = GlobalId::User(123);
224        let batch = 456;
225        let typ: SqlRelationType = SqlRelationType::new(vec![SqlColumnType {
226            scalar_type: mz_repr::SqlScalarType::String,
227            nullable: true,
228        }]);
229        let column_names = vec![ColumnName::from("col1")];
230        let desc = RelationDesc::new(typ, column_names.into_iter());
231        let mut uploader = PgCopyUploader::new(
232            sdk_config.clone(),
233            S3UploadInfo {
234                uri: format!("s3://{}/{}", bucket, path),
235                // this is only for testing, users will not be able to set value smaller than 16MB.
236                max_file_size: ByteSize::b(6).as_u64(),
237                desc,
238                format: S3SinkFormat::PgCopy(CopyFormatParams::Csv(Default::default())),
239            },
240            &sink_id,
241            batch,
242            CopyToParameters {
243                s3_multipart_part_size_bytes: 10 * 1024 * 1024,
244                arrow_builder_buffer_ratio: 100,
245                parquet_row_group_ratio: 100,
246            },
247        )?;
248        let mut row = Row::default();
249        // Even though this will exceed max_file_size, it should be successfully uploaded in a single file.
250        row.packer().push(Datum::from("1234567"));
251        uploader.append_row(&row).await?;
252
253        // Since the max_file_size is 6B, this row will be uploaded to a new file.
254        row.packer().push(Datum::Null);
255        uploader.append_row(&row).await?;
256
257        row.packer().push(Datum::from("5678"));
258        uploader.append_row(&row).await?;
259
260        uploader.finish().await?;
261
262        // Based on the max_file_size, the uploader should have uploaded two
263        // files, part-0001.csv and part-0002.csv
264        let s3_client = mz_aws_util::s3::new_client(&sdk_config);
265        let first_file = s3_client
266            .get_object()
267            .bucket(bucket.clone())
268            .key(format!(
269                "{}/mz-{}-batch-{:04}-0001.csv",
270                path, sink_id, batch
271            ))
272            .send()
273            .await
274            .unwrap();
275
276        let body = first_file.body.collect().await.unwrap().into_bytes();
277        let expected_body: &[u8] = b"1234567\n";
278        assert_eq!(body, *expected_body);
279
280        let second_file = s3_client
281            .get_object()
282            .bucket(bucket)
283            .key(format!(
284                "{}/mz-{}-batch-{:04}-0002.csv",
285                path, sink_id, batch
286            ))
287            .send()
288            .await
289            .unwrap();
290
291        let body = second_file.body.collect().await.unwrap().into_bytes();
292        let expected_body: &[u8] = b"\n5678\n";
293        assert_eq!(body, *expected_body);
294
295        Ok(())
296    }
297}