mz_storage_operators/s3_oneshot_sink/
pgcopy.rs1use anyhow::anyhow;
11use aws_types::sdk_config::SdkConfig;
12use mz_aws_util::s3_uploader::{
13    CompletedUpload, S3MultiPartUploadError, S3MultiPartUploader, S3MultiPartUploaderConfig,
14};
15use mz_ore::assert_none;
16use mz_ore::cast::CastFrom;
17use mz_ore::task::JoinHandleExt;
18use mz_pgcopy::{CopyFormatParams, encode_copy_format, encode_copy_format_header};
19use mz_repr::{GlobalId, RelationDesc, Row};
20use mz_storage_types::sinks::s3_oneshot_sink::S3KeyManager;
21use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo};
22use tracing::info;
23
24use super::{CopyToParameters, CopyToS3Uploader};
25
26pub(super) struct PgCopyUploader {
28    desc: RelationDesc,
30    format: CopyFormatParams<'static>,
32    file_index: usize,
34    key_manager: S3KeyManager,
36    batch: u64,
38    max_file_size: u64,
41    sdk_config: Option<SdkConfig>,
45    current_file_uploader: Option<S3MultiPartUploader>,
48    params: CopyToParameters,
50}
51
52impl CopyToS3Uploader for PgCopyUploader {
53    fn new(
54        sdk_config: SdkConfig,
55        connection_details: S3UploadInfo,
56        sink_id: &GlobalId,
57        batch: u64,
58        params: CopyToParameters,
59    ) -> Result<PgCopyUploader, anyhow::Error> {
60        match connection_details.format {
61            S3SinkFormat::PgCopy(format_params) => Ok(PgCopyUploader {
62                desc: connection_details.desc,
63                sdk_config: Some(sdk_config),
64                format: format_params,
65                key_manager: S3KeyManager::new(sink_id, &connection_details.uri),
66                batch,
67                max_file_size: connection_details.max_file_size,
68                file_index: 0,
69                current_file_uploader: None,
70                params,
71            }),
72            _ => anyhow::bail!("Expected PgCopy format"),
73        }
74    }
75
76    async fn finish(&mut self) -> Result<(), anyhow::Error> {
78        if let Some(uploader) = self.current_file_uploader.take() {
79            let handle =
81                mz_ore::task::spawn(|| "s3_uploader::finish", async { uploader.finish().await });
82            let CompletedUpload {
83                part_count,
84                total_bytes_uploaded,
85                bucket,
86                key,
87            } = handle.wait_and_assert_finished().await?;
88            info!(
89                "finished upload: bucket {}, key {}, bytes_uploaded {}, parts_uploaded {}",
90                bucket, key, total_bytes_uploaded, part_count
91            );
92        }
93        Ok(())
94    }
95
96    async fn append_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
101        let mut buf: Vec<u8> = vec![];
102        encode_copy_format(&self.format, row, self.desc.typ(), &mut buf)
104            .map_err(|_| anyhow!("error encoding row"))?;
105
106        if self.current_file_uploader.is_none() {
107            self.start_new_file_upload().await?;
108        }
109        let mut uploader = self.current_file_uploader.as_mut().expect("known exists");
110
111        match uploader.buffer_chunk(&buf) {
112            Ok(_) => Ok(()),
113            Err(S3MultiPartUploadError::UploadExceedsMaxFileLimit(_)) => {
114                self.start_new_file_upload().await?;
116                uploader = self.current_file_uploader.as_mut().expect("known exists");
117                uploader.buffer_chunk(&buf)?;
118                Ok(())
119            }
120            Err(e) => Err(e.into()),
121        }
122    }
123
124    async fn force_new_file(&mut self) -> Result<(), anyhow::Error> {
125        self.start_new_file_upload().await
126    }
127}
128
129impl PgCopyUploader {
130    async fn start_new_file_upload(&mut self) -> Result<(), anyhow::Error> {
132        self.finish().await?;
133        assert_none!(self.current_file_uploader);
134
135        self.file_index += 1;
136        let object_key =
137            self.key_manager
138                .data_key(self.batch, self.file_index, self.format.file_extension());
139        let bucket = self.key_manager.bucket.clone();
140        info!("starting upload: bucket {}, key {}", &bucket, &object_key);
141        let sdk_config = self
142            .sdk_config
143            .take()
144            .expect("sdk_config should always be present");
145        let max_file_size = self.max_file_size;
146        let part_size_limit = u64::cast_from(self.params.s3_multipart_part_size_bytes);
148        let handle = mz_ore::task::spawn(|| "s3_uploader::try_new", async move {
149            let uploader = S3MultiPartUploader::try_new(
150                &sdk_config,
151                bucket,
152                object_key,
153                S3MultiPartUploaderConfig {
154                    part_size_limit,
155                    file_size_limit: max_file_size,
156                },
157            )
158            .await;
159            (uploader, sdk_config)
160        });
161        let (uploader, sdk_config) = handle.wait_and_assert_finished().await;
162        self.sdk_config = Some(sdk_config);
163        let mut uploader = uploader?;
164        if self.format.requires_header() {
165            let mut buf: Vec<u8> = vec![];
166            encode_copy_format_header(&self.format, &self.desc, &mut buf)
167                .map_err(|_| anyhow!("error encoding header"))?;
168            uploader.buffer_chunk(&buf)?;
169        }
170        self.current_file_uploader = Some(uploader);
171        Ok(())
172    }
173}
174
175#[cfg(test)]
190mod tests {
191    use bytesize::ByteSize;
192    use mz_pgcopy::CopyFormatParams;
193    use mz_repr::{ColumnName, Datum, SqlColumnType, SqlRelationType};
194    use uuid::Uuid;
195
196    use super::*;
197
198    fn s3_bucket_path_for_test() -> Option<(String, String)> {
199        let bucket = match std::env::var("MZ_S3_UPLOADER_TEST_S3_BUCKET") {
200            Ok(bucket) => bucket,
201            Err(_) => {
202                if mz_ore::env::is_var_truthy("CI") {
203                    panic!("CI is supposed to run this test but something has gone wrong!");
204                }
205                return None;
206            }
207        };
208
209        let prefix = Uuid::new_v4().to_string();
210        let path = format!("cargo_test/{}/file", prefix);
211        Some((bucket, path))
212    }
213
214    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
215    #[cfg_attr(coverage, ignore)] #[cfg_attr(miri, ignore)] #[ignore] async fn test_multiple_files() -> Result<(), anyhow::Error> {
219        let sdk_config = mz_aws_util::defaults().load().await;
220        let (bucket, path) = match s3_bucket_path_for_test() {
221            Some(tuple) => tuple,
222            None => return Ok(()),
223        };
224        let sink_id = GlobalId::User(123);
225        let batch = 456;
226        let typ: SqlRelationType = SqlRelationType::new(vec![SqlColumnType {
227            scalar_type: mz_repr::SqlScalarType::String,
228            nullable: true,
229        }]);
230        let column_names = vec![ColumnName::from("col1")];
231        let desc = RelationDesc::new(typ, column_names.into_iter());
232        let mut uploader = PgCopyUploader::new(
233            sdk_config.clone(),
234            S3UploadInfo {
235                uri: format!("s3://{}/{}", bucket, path),
236                max_file_size: ByteSize::b(6).as_u64(),
238                desc,
239                format: S3SinkFormat::PgCopy(CopyFormatParams::Csv(Default::default())),
240            },
241            &sink_id,
242            batch,
243            CopyToParameters {
244                s3_multipart_part_size_bytes: 10 * 1024 * 1024,
245                arrow_builder_buffer_ratio: 100,
246                parquet_row_group_ratio: 100,
247            },
248        )?;
249        let mut row = Row::default();
250        row.packer().push(Datum::from("1234567"));
252        uploader.append_row(&row).await?;
253
254        row.packer().push(Datum::Null);
256        uploader.append_row(&row).await?;
257
258        row.packer().push(Datum::from("5678"));
259        uploader.append_row(&row).await?;
260
261        uploader.finish().await?;
262
263        let s3_client = mz_aws_util::s3::new_client(&sdk_config);
266        let first_file = s3_client
267            .get_object()
268            .bucket(bucket.clone())
269            .key(format!(
270                "{}/mz-{}-batch-{:04}-0001.csv",
271                path, sink_id, batch
272            ))
273            .send()
274            .await
275            .unwrap();
276
277        let body = first_file.body.collect().await.unwrap().into_bytes();
278        let expected_body: &[u8] = b"1234567\n";
279        assert_eq!(body, *expected_body);
280
281        let second_file = s3_client
282            .get_object()
283            .bucket(bucket)
284            .key(format!(
285                "{}/mz-{}-batch-{:04}-0002.csv",
286                path, sink_id, batch
287            ))
288            .send()
289            .await
290            .unwrap();
291
292        let body = second_file.body.collect().await.unwrap().into_bytes();
293        let expected_body: &[u8] = b"\n5678\n";
294        assert_eq!(body, *expected_body);
295
296        Ok(())
297    }
298}