mz_storage_operators/s3_oneshot_sink/
pgcopy.rs1use anyhow::anyhow;
11use aws_types::sdk_config::SdkConfig;
12use mz_aws_util::s3_uploader::{
13 CompletedUpload, S3MultiPartUploadError, S3MultiPartUploader, S3MultiPartUploaderConfig,
14};
15use mz_ore::assert_none;
16use mz_ore::cast::CastFrom;
17use mz_pgcopy::{CopyFormatParams, encode_copy_format, encode_copy_format_header};
18use mz_repr::{GlobalId, RelationDesc, Row};
19use mz_storage_types::sinks::s3_oneshot_sink::S3KeyManager;
20use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo};
21use tracing::info;
22
23use super::{CopyToParameters, CopyToS3Uploader};
24
25pub(super) struct PgCopyUploader {
27 desc: RelationDesc,
29 format: CopyFormatParams<'static>,
31 file_index: usize,
33 key_manager: S3KeyManager,
35 batch: u64,
37 max_file_size: u64,
40 sdk_config: Option<SdkConfig>,
44 current_file_uploader: Option<S3MultiPartUploader>,
47 params: CopyToParameters,
49}
50
51impl CopyToS3Uploader for PgCopyUploader {
52 fn new(
53 sdk_config: SdkConfig,
54 connection_details: S3UploadInfo,
55 sink_id: &GlobalId,
56 batch: u64,
57 params: CopyToParameters,
58 ) -> Result<PgCopyUploader, anyhow::Error> {
59 match connection_details.format {
60 S3SinkFormat::PgCopy(format_params) => Ok(PgCopyUploader {
61 desc: connection_details.desc,
62 sdk_config: Some(sdk_config),
63 format: format_params,
64 key_manager: S3KeyManager::new(sink_id, &connection_details.uri),
65 batch,
66 max_file_size: connection_details.max_file_size,
67 file_index: 0,
68 current_file_uploader: None,
69 params,
70 }),
71 _ => anyhow::bail!("Expected PgCopy format"),
72 }
73 }
74
75 async fn finish(&mut self) -> Result<(), anyhow::Error> {
77 if let Some(uploader) = self.current_file_uploader.take() {
78 let handle =
80 mz_ore::task::spawn(|| "s3_uploader::finish", async { uploader.finish().await });
81 let CompletedUpload {
82 part_count,
83 total_bytes_uploaded,
84 bucket,
85 key,
86 } = handle.await?;
87 info!(
88 "finished upload: bucket {}, key {}, bytes_uploaded {}, parts_uploaded {}",
89 bucket, key, total_bytes_uploaded, part_count
90 );
91 }
92 Ok(())
93 }
94
95 async fn append_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
100 let mut buf: Vec<u8> = vec![];
101 encode_copy_format(&self.format, row, self.desc.typ(), &mut buf)
103 .map_err(|_| anyhow!("error encoding row"))?;
104
105 if self.current_file_uploader.is_none() {
106 self.start_new_file_upload().await?;
107 }
108 let mut uploader = self.current_file_uploader.as_mut().expect("known exists");
109
110 match uploader.buffer_chunk(&buf) {
111 Ok(_) => Ok(()),
112 Err(S3MultiPartUploadError::UploadExceedsMaxFileLimit(_)) => {
113 self.start_new_file_upload().await?;
115 uploader = self.current_file_uploader.as_mut().expect("known exists");
116 uploader.buffer_chunk(&buf)?;
117 Ok(())
118 }
119 Err(e) => Err(e.into()),
120 }
121 }
122
123 async fn force_new_file(&mut self) -> Result<(), anyhow::Error> {
124 self.start_new_file_upload().await
125 }
126}
127
128impl PgCopyUploader {
129 async fn start_new_file_upload(&mut self) -> Result<(), anyhow::Error> {
131 self.finish().await?;
132 assert_none!(self.current_file_uploader);
133
134 self.file_index += 1;
135 let object_key =
136 self.key_manager
137 .data_key(self.batch, self.file_index, self.format.file_extension());
138 let bucket = self.key_manager.bucket.clone();
139 info!("starting upload: bucket {}, key {}", &bucket, &object_key);
140 let sdk_config = self
141 .sdk_config
142 .take()
143 .expect("sdk_config should always be present");
144 let max_file_size = self.max_file_size;
145 let part_size_limit = u64::cast_from(self.params.s3_multipart_part_size_bytes);
147 let handle = mz_ore::task::spawn(|| "s3_uploader::try_new", async move {
148 let uploader = S3MultiPartUploader::try_new(
149 &sdk_config,
150 bucket,
151 object_key,
152 S3MultiPartUploaderConfig {
153 part_size_limit,
154 file_size_limit: max_file_size,
155 },
156 )
157 .await;
158 (uploader, sdk_config)
159 });
160 let (uploader, sdk_config) = handle.await;
161 self.sdk_config = Some(sdk_config);
162 let mut uploader = uploader?;
163 if self.format.requires_header() {
164 let mut buf: Vec<u8> = vec![];
165 encode_copy_format_header(&self.format, &self.desc, &mut buf)
166 .map_err(|_| anyhow!("error encoding header"))?;
167 uploader.buffer_chunk(&buf)?;
168 }
169 self.current_file_uploader = Some(uploader);
170 Ok(())
171 }
172}
173
174#[cfg(test)]
189mod tests {
190 use bytesize::ByteSize;
191 use mz_pgcopy::CopyFormatParams;
192 use mz_repr::{ColumnName, Datum, SqlColumnType, SqlRelationType};
193 use uuid::Uuid;
194
195 use super::*;
196
197 fn s3_bucket_path_for_test() -> Option<(String, String)> {
198 let bucket = match std::env::var("MZ_S3_UPLOADER_TEST_S3_BUCKET") {
199 Ok(bucket) => bucket,
200 Err(_) => {
201 if mz_ore::env::is_var_truthy("CI") {
202 panic!("CI is supposed to run this test but something has gone wrong!");
203 }
204 return None;
205 }
206 };
207
208 let prefix = Uuid::new_v4().to_string();
209 let path = format!("cargo_test/{}/file", prefix);
210 Some((bucket, path))
211 }
212
213 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
214 #[cfg_attr(coverage, ignore)] #[cfg_attr(miri, ignore)] #[ignore] async fn test_multiple_files() -> Result<(), anyhow::Error> {
218 let sdk_config = mz_aws_util::defaults().load().await;
219 let (bucket, path) = match s3_bucket_path_for_test() {
220 Some(tuple) => tuple,
221 None => return Ok(()),
222 };
223 let sink_id = GlobalId::User(123);
224 let batch = 456;
225 let typ: SqlRelationType = SqlRelationType::new(vec![SqlColumnType {
226 scalar_type: mz_repr::SqlScalarType::String,
227 nullable: true,
228 }]);
229 let column_names = vec![ColumnName::from("col1")];
230 let desc = RelationDesc::new(typ, column_names.into_iter());
231 let mut uploader = PgCopyUploader::new(
232 sdk_config.clone(),
233 S3UploadInfo {
234 uri: format!("s3://{}/{}", bucket, path),
235 max_file_size: ByteSize::b(6).as_u64(),
237 desc,
238 format: S3SinkFormat::PgCopy(CopyFormatParams::Csv(Default::default())),
239 },
240 &sink_id,
241 batch,
242 CopyToParameters {
243 s3_multipart_part_size_bytes: 10 * 1024 * 1024,
244 arrow_builder_buffer_ratio: 100,
245 parquet_row_group_ratio: 100,
246 },
247 )?;
248 let mut row = Row::default();
249 row.packer().push(Datum::from("1234567"));
251 uploader.append_row(&row).await?;
252
253 row.packer().push(Datum::Null);
255 uploader.append_row(&row).await?;
256
257 row.packer().push(Datum::from("5678"));
258 uploader.append_row(&row).await?;
259
260 uploader.finish().await?;
261
262 let s3_client = mz_aws_util::s3::new_client(&sdk_config);
265 let first_file = s3_client
266 .get_object()
267 .bucket(bucket.clone())
268 .key(format!(
269 "{}/mz-{}-batch-{:04}-0001.csv",
270 path, sink_id, batch
271 ))
272 .send()
273 .await
274 .unwrap();
275
276 let body = first_file.body.collect().await.unwrap().into_bytes();
277 let expected_body: &[u8] = b"1234567\n";
278 assert_eq!(body, *expected_body);
279
280 let second_file = s3_client
281 .get_object()
282 .bucket(bucket)
283 .key(format!(
284 "{}/mz-{}-batch-{:04}-0002.csv",
285 path, sink_id, batch
286 ))
287 .send()
288 .await
289 .unwrap();
290
291 let body = second_file.body.collect().await.unwrap().into_bytes();
292 let expected_body: &[u8] = b"\n5678\n";
293 assert_eq!(body, *expected_body);
294
295 Ok(())
296 }
297}