mz_storage_operators/s3_oneshot_sink/parquet.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::Arc;
11
12use aws_types::sdk_config::SdkConfig;
13use mz_arrow_util::builder::ArrowBuilder;
14use mz_aws_util::s3_uploader::{
15 AWS_S3_MAX_PART_COUNT, CompletedUpload, S3MultiPartUploader, S3MultiPartUploaderConfig,
16};
17use mz_ore::cast::CastFrom;
18use mz_ore::future::OreFutureExt;
19use mz_repr::{GlobalId, RelationDesc, Row};
20use mz_storage_types::sinks::s3_oneshot_sink::S3KeyManager;
21use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo};
22use parquet::file::properties::EnabledStatistics;
23use parquet::{
24 arrow::arrow_writer::ArrowWriter,
25 basic::Compression,
26 file::properties::{WriterProperties, WriterVersion},
27};
28use tracing::{debug, info};
29
30use super::{CopyToParameters, CopyToS3Uploader};
31
32/// Set the default capacity for the array builders inside the ArrowBuilder. This is the
33/// number of items each builder can hold before it needs to allocate more memory.
34const DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY: usize = 1024;
35/// Set the default buffer capacity for the string and binary array builders inside the
36/// ArrowBuilder. This is the number of bytes each builder can hold before it needs to allocate
37/// more memory.
38const DEFAULT_ARRAY_BUILDER_DATA_CAPACITY: usize = 1024;
39
40/// A [`ParquetUploader`] that writes rows to parquet files and uploads them to S3.
41///
42/// Spawns all S3 operations in tokio tasks to avoid blocking the surrounding timely context.
43///
44/// ## Buffering
45///
46/// There are several layers of buffering in this uploader:
47///
48/// - The uploader will hold a [`ParquetFile`] object after the first row is added. This
49/// [`ParquetFile`] holds an [`ArrowBuilder`] and an [`ArrowWriter`].
50///
51/// - The [`ArrowBuilder`] builds a structure of in-memory `mz_arrow_util::builder::ColBuilder`s from incoming
52/// [`mz_repr::Row`]s. Each `mz_arrow_util::builder::ColBuilder` holds a specific [`arrow::array::builder`] type
53/// for constructing a column of the given type. The entire [`ArrowBuilder`] is flushed to
54/// the [`ParquetFile`]'s [`ArrowWriter`] by converting it into a [`arrow::record_batch::RecordBatch`] once we've
55/// given it more than the configured arrow_builder_buffer_bytes.
56///
57/// - The [`ParquetFile`] holds a [`ArrowWriter`] that buffers until it has enough data to write
58/// a parquet 'row group'. The 'row group' size is usually based on the number of rows (in the
59/// ArrowWriter), but we also force it to flush based on data-size (see below for more details).
60///
61/// - When a row group is written out, the active [`ParquetFile`] provides a reference to the row
62/// group buffer to its [`S3MultiPartUploader`] which will copy the data to its own buffer.
63/// If this upload buffer exceeds the configured part size limit, the [`S3MultiPartUploader`]
64/// will upload parts to S3 until the upload buffer is below the limit.
65///
66/// - When the [`ParquetUploader`] is finished, it will flush the active [`ParquetFile`] which will
67/// flush its [`ArrowBuilder`] and any open row groups to the [`S3MultiPartUploader`] and upload
68/// the remaining parts to S3.
69/// ```text
70/// ┌───────────────┐
71/// │ mz_repr::Rows │
72/// └───────┬───────┘
73/// ┌─────────────│───────────────────────────────────────────┐
74/// │ │ ParquetFile │
75/// │ ┌───────────▼─────────────┐ │
76/// │ │ ArrowBuilder │ │
77/// │ │ │ ┌──────────────────┐ │
78/// │ │ Vec<ArrowColumn> │ │ ArrowWriter │ │
79/// │ │ ┌─────────┐ ┌─────────┐ │ │ │ │
80/// │ │ │ │ │ │ │ │ ┌──────────┐ │ │
81/// │ │ │ColBuildr│ │ColBuildr│ ├────┼──►│ buffer │ │ │
82/// │ │ │ │ │ │ │ │ └─────┬────┘ │ │
83/// │ │ └─────────┘ └─────────┘ │ │ │ │ │
84/// │ │ │ │ ┌─────▼────┐ │ │
85/// │ └─────────────────────────┘ │ │ row group│ │ │
86/// │ │ └─┬────────┘ │ │
87/// │ │ │ │ │
88/// │ └─────┼────────────┘ │
89/// │ ┌──────┼────────────────┐ │
90/// │ │ │ S3MultiPart│ │
91/// │ │ ┌────▼─────┐ Uploader │ │
92/// │ │ │ buffer │ │ │
93/// │ ┌─────────┐ │ └───┬─────┬┘ │ │
94/// │ │ S3 API │◄───────────────┤ │ │ │ │
95/// │ └─────────┘ │ ┌───▼──┐ ┌▼─────┐ │ │
96/// │ │ │ part │ │ part │ │ │
97/// │ │ └──────┘ └──────┘ │ │
98/// │ │ │ │
99/// │ └───────────────────────┘ │
100/// │ │
101/// └─────────────────────────────────────────────────────────┘
102/// ```
103///
104/// ## File Size & Buffer Sizes
105///
106/// We expose a 'MAX FILE SIZE' parameter to the user, but this is difficult to enforce exactly
107/// since we don't know the exact size of the data we're writing before a parquet row-group
108/// is flushed. This is because the encoded size of the data is different than the in-memory
109/// representation and because the data pages within each column in a row-group are compressed.
110/// We also don't know the exact size of the parquet metadata that will be written to the file.
111///
112/// Therefore we don't use the S3MultiPartUploader's hard file size limit since it's difficult
113/// to handle those errors after we've already flushed data to the ArrowWriter. Instead we
114/// implement a crude check ourselves.
115///
116/// This check aims to hit the max-size limit but may exceed it by some amount. To ensure
117/// that amount is small, we set the max row-group size to a configurable ratio (e.g. 20%)
118/// of the max_file_size. This determines how often we'll flush a row-group, but is only an
119/// approximation since the actual size of the row-group is not known until it's written.
120/// After each row-group is flushed, the size of the file is checked and if it's exceeded
121/// max-file-size a new file is started.
122///
123/// We also set the max ArrowBuilder buffer size to a ratio (e.g. 150%) of the row-group size
124/// to avoid the ArrowWriter buffering too much data itself before flushing a row-group. We're
125/// aiming for the encoded & compressed size of the ArrowBuilder data to be roughly equal to
126/// the row-group size, but this is only an approximation.
127///
128/// TODO: We may want to consider adding additional limits to the buffer sizes to avoid memory
129/// issues if the user sets the max file size to be very large.
130pub(super) struct ParquetUploader {
131 /// The output description.
132 desc: Arc<RelationDesc>,
133 /// The index of the next file to upload within the batch.
134 next_file_index: usize,
135 /// Provides the appropriate bucket and object keys to use for uploads.
136 key_manager: S3KeyManager,
137 /// Identifies the batch that files uploaded by this uploader belong to.
138 batch: u64,
139 /// The desired file size. A new file upload will be started
140 /// when the size exceeds this amount.
141 max_file_size: u64,
142 /// The aws sdk config.
143 sdk_config: Arc<SdkConfig>,
144 row_group_size_bytes: u64,
145 arrow_builder_buffer_bytes: u64,
146 /// The active parquet file being written to, stored in an option
147 /// since it won't be initialized until the builder is first flushed,
148 /// and to make it easier to take ownership when calling in spawned
149 /// tokio tasks (to avoid doing I/O in the surrounding timely context).
150 active_file: Option<ParquetFile>,
151 /// Upload and buffer params
152 params: CopyToParameters,
153}
154
155impl CopyToS3Uploader for ParquetUploader {
156 fn new(
157 sdk_config: SdkConfig,
158 connection_details: S3UploadInfo,
159 sink_id: &GlobalId,
160 batch: u64,
161 params: CopyToParameters,
162 ) -> Result<ParquetUploader, anyhow::Error> {
163 if params.parquet_row_group_ratio > 100 {
164 anyhow::bail!("parquet_row_group_ratio must be <= 100");
165 }
166 if params.arrow_builder_buffer_ratio < 100 {
167 anyhow::bail!("arrow_builder_buffer_ratio must be >= 100");
168 }
169 let row_group_size_bytes =
170 connection_details.max_file_size * u64::cast_from(params.parquet_row_group_ratio) / 100;
171 let arrow_builder_buffer_bytes =
172 row_group_size_bytes * u64::cast_from(params.arrow_builder_buffer_ratio) / 100;
173
174 match connection_details.format {
175 S3SinkFormat::Parquet => Ok(ParquetUploader {
176 desc: Arc::new(connection_details.desc),
177 sdk_config: Arc::new(sdk_config),
178 key_manager: S3KeyManager::new(sink_id, &connection_details.uri),
179 batch,
180 max_file_size: connection_details.max_file_size,
181 next_file_index: 0,
182 row_group_size_bytes,
183 arrow_builder_buffer_bytes,
184 active_file: None,
185 params,
186 }),
187 _ => anyhow::bail!("Expected Parquet format"),
188 }
189 }
190
191 async fn append_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
192 let active_file = match self.active_file {
193 Some(ref mut file) => file,
194 None => self.start_new_file().await?,
195 };
196
197 active_file.add_row(row)?;
198
199 // If this file has gone over the max size, start a new one
200 if active_file.size_estimate() >= self.max_file_size {
201 debug!("file size limit exceeded, starting new file");
202 self.start_new_file().await?;
203 }
204
205 Ok(())
206 }
207
208 async fn finish(&mut self) -> Result<(), anyhow::Error> {
209 if let Some(active_file) = self.active_file.take() {
210 active_file
211 .finish()
212 .run_in_task(|| "ParquetFile::finish")
213 .await?;
214 }
215 Ok(())
216 }
217
218 async fn force_new_file(&mut self) -> Result<(), anyhow::Error> {
219 self.start_new_file().await?;
220 Ok(())
221 }
222}
223
224impl ParquetUploader {
225 /// Start a new parquet file for upload. Will finish the current file if one is active.
226 async fn start_new_file(&mut self) -> Result<&mut ParquetFile, anyhow::Error> {
227 if let Some(active_file) = self.active_file.take() {
228 active_file
229 .finish()
230 .run_in_task(|| "ParquetFile::finish")
231 .await?;
232 }
233 let object_key = self
234 .key_manager
235 .data_key(self.batch, self.next_file_index, "parquet");
236 self.next_file_index += 1;
237
238 let bucket = self.key_manager.bucket.clone();
239 info!("starting upload: bucket {}, key {}", bucket, object_key);
240 let new_file = ParquetFile::new(
241 bucket,
242 object_key,
243 Arc::clone(&self.desc),
244 Arc::clone(&self.sdk_config),
245 self.arrow_builder_buffer_bytes,
246 self.row_group_size_bytes,
247 u64::cast_from(self.params.s3_multipart_part_size_bytes),
248 )
249 .run_in_task(|| "ParquetFile::new")
250 .await?;
251
252 self.active_file = Some(new_file);
253 Ok(self.active_file.as_mut().unwrap())
254 }
255}
256
257/// Helper to tie the lifecycle of the `ArrowBuilder`, `ArrowWriter`, and `S3MultiPartUploader`
258/// together for a single parquet file.
259struct ParquetFile {
260 /// The active arrow builder.
261 builder: ArrowBuilder,
262 writer: ArrowWriter<Vec<u8>>,
263 // TODO: Consider implementing `tokio::io::AsyncWrite` on `S3MultiPartUploader` which would
264 // allow us to write directly to the uploader instead of buffering the data in a vec first.
265 uploader: S3MultiPartUploader,
266 arrow_builder_buffer_bytes: u64,
267 row_group_size: u64,
268 desc: Arc<RelationDesc>,
269}
270
271impl ParquetFile {
272 async fn new(
273 bucket: String,
274 key: String,
275 desc: Arc<RelationDesc>,
276 sdk_config: Arc<SdkConfig>,
277 arrow_builder_buffer_bytes: u64,
278 row_group_size: u64,
279 part_size_limit: u64,
280 ) -> Result<Self, anyhow::Error> {
281 let builder = ArrowBuilder::new(
282 &desc,
283 DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
284 DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
285 )?;
286
287 let props = WriterProperties::builder()
288 // This refers to the number of rows per row-group, which we don't want the writer
289 // to enforce since we will flush based on the byte-size of the active row group
290 .set_max_row_group_size(usize::MAX)
291 // Max compatibility
292 .set_writer_version(WriterVersion::PARQUET_1_0)
293 .set_compression(Compression::SNAPPY)
294 .set_statistics_enabled(EnabledStatistics::None)
295 .build();
296
297 // TODO: Consider using an lgalloc buffer here instead of a vec
298 let writer = ArrowWriter::try_new(Vec::new(), builder.schema().into(), Some(props))?;
299 let uploader = S3MultiPartUploader::try_new(
300 sdk_config.as_ref(),
301 bucket,
302 key,
303 S3MultiPartUploaderConfig {
304 part_size_limit,
305 // We are already enforcing the max size ourselves so we set the max size enforced
306 // by the uploader to the max file size it will allow based on the part size limit.
307 // This is known to be greater than the `MAX_S3_SINK_FILE_SIZE` enforced during
308 // sink creation.
309 file_size_limit: part_size_limit
310 .checked_mul(AWS_S3_MAX_PART_COUNT.try_into().expect("known safe"))
311 .expect("known safe"),
312 },
313 )
314 .await?;
315
316 Ok(Self {
317 writer,
318 uploader,
319 arrow_builder_buffer_bytes,
320 row_group_size,
321 desc,
322 builder,
323 })
324 }
325
326 fn add_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
327 self.builder.add_row(row)?;
328
329 if u64::cast_from(self.builder.row_size_bytes()) > self.arrow_builder_buffer_bytes {
330 self.flush_builder()?;
331 }
332
333 Ok(())
334 }
335
336 /// Flush the current arrow builder, the parquet writer, and the uploader.
337 async fn finish(mut self) -> Result<CompletedUpload, anyhow::Error> {
338 self.flush_builder()?;
339 let buffer = self.writer.into_inner()?;
340 self.uploader.buffer_chunk(buffer.as_slice())?;
341 let res = self.uploader.finish().await?;
342 info!(
343 "finished upload: bucket {}, key {}, bytes_uploaded {}, parts_uploaded {}",
344 res.bucket, res.key, res.total_bytes_uploaded, res.part_count
345 );
346 Ok(res)
347 }
348
349 /// Flush the current arrow builder to the parquet writer then flushes the writer's buffer to
350 /// the uploader which may trigger an upload.
351 fn flush_builder(&mut self) -> Result<(), anyhow::Error> {
352 let builder = std::mem::replace(
353 &mut self.builder,
354 ArrowBuilder::new(
355 &self.desc,
356 DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
357 DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
358 )?,
359 );
360 let arrow_batch = builder.to_record_batch()?;
361
362 if arrow_batch.num_rows() == 0 {
363 return Ok(());
364 }
365
366 let before_groups = self.writer.flushed_row_groups().len();
367 self.writer.write(&arrow_batch)?;
368
369 // The writer will flush its buffer to a new parquet row group based on the row count,
370 // not the actual size of the data. We flush manually to allow uploading the data in
371 // potentially smaller chunks.
372 if u64::cast_from(self.writer.in_progress_size()) > self.row_group_size {
373 self.writer.flush()?;
374 }
375
376 // If the writer has flushed a new row group we can steal its buffer and upload it.
377 if self.writer.flushed_row_groups().len() > before_groups {
378 let buffer = self.writer.inner_mut();
379 self.uploader.buffer_chunk(buffer.as_slice())?;
380 // reuse the buffer in the writer
381 buffer.clear();
382 }
383 Ok(())
384 }
385
386 /// Returns an approximate size estimate of the file being written.
387 fn size_estimate(&self) -> u64 {
388 // ArrowWriter.in_progress_size() is just an estimate since it doesn't seem
389 // to account for data page compression and metadata that will be written for the next
390 // row-group.
391 u64::cast_from(self.writer.in_progress_size()) + self.uploader.added_bytes()
392 }
393}