mz_storage_operators/s3_oneshot_sink/parquet.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::Arc;
11
12use aws_types::sdk_config::SdkConfig;
13use mz_arrow_util::builder::ArrowBuilder;
14use mz_aws_util::s3_uploader::{
15    AWS_S3_MAX_PART_COUNT, CompletedUpload, S3MultiPartUploader, S3MultiPartUploaderConfig,
16};
17use mz_ore::cast::CastFrom;
18use mz_ore::future::OreFutureExt;
19use mz_repr::{GlobalId, RelationDesc, Row};
20use mz_storage_types::sinks::s3_oneshot_sink::S3KeyManager;
21use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo};
22use parquet::file::properties::EnabledStatistics;
23use parquet::{
24    arrow::arrow_writer::ArrowWriter,
25    basic::Compression,
26    file::properties::{WriterProperties, WriterVersion},
27};
28use tracing::{debug, info};
29
30use super::{CopyToParameters, CopyToS3Uploader};
31
32/// Set the default capacity for the array builders inside the ArrowBuilder. This is the
33/// number of items each builder can hold before it needs to allocate more memory.
34const DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY: usize = 1024;
35/// Set the default buffer capacity for the string and binary array builders inside the
36/// ArrowBuilder. This is the number of bytes each builder can hold before it needs to allocate
37/// more memory.
38const DEFAULT_ARRAY_BUILDER_DATA_CAPACITY: usize = 1024;
39
40/// A [`ParquetUploader`] that writes rows to parquet files and uploads them to S3.
41///
42/// Spawns all S3 operations in tokio tasks to avoid blocking the surrounding timely context.
43///
44/// ## Buffering
45///
46/// There are several layers of buffering in this uploader:
47///
48/// - The uploader will hold a [`ParquetFile`] object after the first row is added. This
49///   [`ParquetFile`] holds an [`ArrowBuilder`] and an [`ArrowWriter`].
50///
51/// - The [`ArrowBuilder`] builds a structure of in-memory `mz_arrow_util::builder::ColBuilder`s from incoming
52///   [`mz_repr::Row`]s. Each `mz_arrow_util::builder::ColBuilder` holds a specific [`arrow::array::builder`] type
53///   for constructing a column of the given type. The entire [`ArrowBuilder`] is flushed to
54///   the [`ParquetFile`]'s [`ArrowWriter`] by converting it into a [`arrow::record_batch::RecordBatch`] once we've
55///   given it more than the configured arrow_builder_buffer_bytes.
56///
57/// - The [`ParquetFile`] holds a [`ArrowWriter`] that buffers until it has enough data to write
58///   a parquet 'row group'. The 'row group' size is usually based on the number of rows (in the
59///   ArrowWriter), but we also force it to flush based on data-size (see below for more details).
60///
61/// - When a row group is written out, the active [`ParquetFile`] provides a reference to the row
62///   group buffer to its [`S3MultiPartUploader`] which will copy the data to its own buffer.
63///   If this upload buffer exceeds the configured part size limit, the [`S3MultiPartUploader`]
64///   will upload parts to S3 until the upload buffer is below the limit.
65///
66/// - When the [`ParquetUploader`] is finished, it will flush the active [`ParquetFile`] which will
67///   flush its [`ArrowBuilder`] and any open row groups to the [`S3MultiPartUploader`] and upload
68///   the remaining parts to S3.
69/// ```text
70///       ┌───────────────┐
71///       │ mz_repr::Rows │
72///       └───────┬───────┘
73/// ┌─────────────│───────────────────────────────────────────┐
74/// │             │         ParquetFile                       │
75/// │ ┌───────────▼─────────────┐                             │
76/// │ │       ArrowBuilder      │                             │
77/// │ │                         │    ┌──────────────────┐     │
78/// │ │     Vec<ArrowColumn>    │    │    ArrowWriter   │     │
79/// │ │ ┌─────────┐ ┌─────────┐ │    │                  │     │
80/// │ │ │         │ │         │ │    │   ┌──────────┐   │     │
81/// │ │ │ColBuildr│ │ColBuildr│ ├────┼──►│  buffer  │   │     │
82/// │ │ │         │ │         │ │    │   └─────┬────┘   │     │
83/// │ │ └─────────┘ └─────────┘ │    │         │        │     │
84/// │ │                         │    │   ┌─────▼────┐   │     │
85/// │ └─────────────────────────┘    │   │ row group│   │     │
86/// │                                │   └─┬────────┘   │     │
87/// │                                │     │            │     │
88/// │                                └─────┼────────────┘     │
89/// │                               ┌──────┼────────────────┐ │
90/// │                               │      │     S3MultiPart│ │
91/// │                               │ ┌────▼─────┐ Uploader │ │
92/// │                               │ │  buffer  │          │ │
93/// │    ┌─────────┐                │ └───┬─────┬┘          │ │
94/// │    │ S3 API  │◄───────────────┤     │     │           │ │
95/// │    └─────────┘                │ ┌───▼──┐ ┌▼─────┐     │ │
96/// │                               │ │ part │ │ part │     │ │
97/// │                               │ └──────┘ └──────┘     │ │
98/// │                               │                       │ │
99/// │                               └───────────────────────┘ │
100/// │                                                         │
101/// └─────────────────────────────────────────────────────────┘
102/// ```
103///
104/// ## File Size & Buffer Sizes
105///
106/// We expose a 'MAX FILE SIZE' parameter to the user, but this is difficult to enforce exactly
107/// since we don't know the exact size of the data we're writing before a parquet row-group
108/// is flushed. This is because the encoded size of the data is different than the in-memory
109/// representation and because the data pages within each column in a row-group are compressed.
110/// We also don't know the exact size of the parquet metadata that will be written to the file.
111///
112/// Therefore we don't use the S3MultiPartUploader's hard file size limit since it's difficult
113/// to handle those errors after we've already flushed data to the ArrowWriter. Instead we
114/// implement a crude check ourselves.
115///
116/// This check aims to hit the max-size limit but may exceed it by some amount. To ensure
117/// that amount is small, we set the max row-group size to a configurable ratio (e.g. 20%)
118/// of the max_file_size. This determines how often we'll flush a row-group, but is only an
119/// approximation since the actual size of the row-group is not known until it's written.
120/// After each row-group is flushed, the size of the file is checked and if it's exceeded
121/// max-file-size a new file is started.
122///
123/// We also set the max ArrowBuilder buffer size to a ratio (e.g. 150%) of the row-group size
124/// to avoid the ArrowWriter buffering too much data itself before flushing a row-group. We're
125/// aiming for the encoded & compressed size of the ArrowBuilder data to be roughly equal to
126/// the row-group size, but this is only an approximation.
127///
128/// TODO: We may want to consider adding additional limits to the buffer sizes to avoid memory
129/// issues if the user sets the max file size to be very large.
130pub(super) struct ParquetUploader {
131    /// The output description.
132    desc: Arc<RelationDesc>,
133    /// The index of the next file to upload within the batch.
134    next_file_index: usize,
135    /// Provides the appropriate bucket and object keys to use for uploads.
136    key_manager: S3KeyManager,
137    /// Identifies the batch that files uploaded by this uploader belong to.
138    batch: u64,
139    /// The desired file size. A new file upload will be started
140    /// when the size exceeds this amount.
141    max_file_size: u64,
142    /// The aws sdk config.
143    sdk_config: Arc<SdkConfig>,
144    row_group_size_bytes: u64,
145    arrow_builder_buffer_bytes: u64,
146    /// The active parquet file being written to, stored in an option
147    /// since it won't be initialized until the builder is first flushed,
148    /// and to make it easier to take ownership when calling in spawned
149    /// tokio tasks (to avoid doing I/O in the surrounding timely context).
150    active_file: Option<ParquetFile>,
151    /// Upload and buffer params
152    params: CopyToParameters,
153}
154
155impl CopyToS3Uploader for ParquetUploader {
156    fn new(
157        sdk_config: SdkConfig,
158        connection_details: S3UploadInfo,
159        sink_id: &GlobalId,
160        batch: u64,
161        params: CopyToParameters,
162    ) -> Result<ParquetUploader, anyhow::Error> {
163        if params.parquet_row_group_ratio > 100 {
164            anyhow::bail!("parquet_row_group_ratio must be <= 100");
165        }
166        if params.arrow_builder_buffer_ratio < 100 {
167            anyhow::bail!("arrow_builder_buffer_ratio must be >= 100");
168        }
169        let row_group_size_bytes =
170            connection_details.max_file_size * u64::cast_from(params.parquet_row_group_ratio) / 100;
171        let arrow_builder_buffer_bytes =
172            row_group_size_bytes * u64::cast_from(params.arrow_builder_buffer_ratio) / 100;
173
174        match connection_details.format {
175            S3SinkFormat::Parquet => Ok(ParquetUploader {
176                desc: Arc::new(connection_details.desc),
177                sdk_config: Arc::new(sdk_config),
178                key_manager: S3KeyManager::new(sink_id, &connection_details.uri),
179                batch,
180                max_file_size: connection_details.max_file_size,
181                next_file_index: 0,
182                row_group_size_bytes,
183                arrow_builder_buffer_bytes,
184                active_file: None,
185                params,
186            }),
187            _ => anyhow::bail!("Expected Parquet format"),
188        }
189    }
190
191    async fn append_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
192        let active_file = match self.active_file {
193            Some(ref mut file) => file,
194            None => self.start_new_file().await?,
195        };
196
197        active_file.add_row(row)?;
198
199        // If this file has gone over the max size, start a new one
200        if active_file.size_estimate() >= self.max_file_size {
201            debug!("file size limit exceeded, starting new file");
202            self.start_new_file().await?;
203        }
204
205        Ok(())
206    }
207
208    async fn finish(&mut self) -> Result<(), anyhow::Error> {
209        if let Some(active_file) = self.active_file.take() {
210            active_file
211                .finish()
212                .run_in_task(|| "ParquetFile::finish")
213                .await?;
214        }
215        Ok(())
216    }
217
218    async fn force_new_file(&mut self) -> Result<(), anyhow::Error> {
219        self.start_new_file().await?;
220        Ok(())
221    }
222}
223
224impl ParquetUploader {
225    /// Start a new parquet file for upload. Will finish the current file if one is active.
226    async fn start_new_file(&mut self) -> Result<&mut ParquetFile, anyhow::Error> {
227        if let Some(active_file) = self.active_file.take() {
228            active_file
229                .finish()
230                .run_in_task(|| "ParquetFile::finish")
231                .await?;
232        }
233        let object_key = self
234            .key_manager
235            .data_key(self.batch, self.next_file_index, "parquet");
236        self.next_file_index += 1;
237
238        let bucket = self.key_manager.bucket.clone();
239        info!("starting upload: bucket {}, key {}", bucket, object_key);
240        let new_file = ParquetFile::new(
241            bucket,
242            object_key,
243            Arc::clone(&self.desc),
244            Arc::clone(&self.sdk_config),
245            self.arrow_builder_buffer_bytes,
246            self.row_group_size_bytes,
247            u64::cast_from(self.params.s3_multipart_part_size_bytes),
248        )
249        .run_in_task(|| "ParquetFile::new")
250        .await?;
251
252        self.active_file = Some(new_file);
253        Ok(self.active_file.as_mut().unwrap())
254    }
255}
256
257/// Helper to tie the lifecycle of the `ArrowBuilder`, `ArrowWriter`, and `S3MultiPartUploader`
258/// together for a single parquet file.
259struct ParquetFile {
260    /// The active arrow builder.
261    builder: ArrowBuilder,
262    writer: ArrowWriter<Vec<u8>>,
263    // TODO: Consider implementing `tokio::io::AsyncWrite` on `S3MultiPartUploader` which would
264    // allow us to write directly to the uploader instead of buffering the data in a vec first.
265    uploader: S3MultiPartUploader,
266    arrow_builder_buffer_bytes: u64,
267    row_group_size: u64,
268    desc: Arc<RelationDesc>,
269}
270
271impl ParquetFile {
272    async fn new(
273        bucket: String,
274        key: String,
275        desc: Arc<RelationDesc>,
276        sdk_config: Arc<SdkConfig>,
277        arrow_builder_buffer_bytes: u64,
278        row_group_size: u64,
279        part_size_limit: u64,
280    ) -> Result<Self, anyhow::Error> {
281        let builder = ArrowBuilder::new(
282            &desc,
283            DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
284            DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
285        )?;
286
287        let props = WriterProperties::builder()
288            // This refers to the number of rows per row-group, which we don't want the writer
289            // to enforce since we will flush based on the byte-size of the active row group
290            .set_max_row_group_size(usize::MAX)
291            // Max compatibility
292            .set_writer_version(WriterVersion::PARQUET_1_0)
293            .set_compression(Compression::SNAPPY)
294            .set_statistics_enabled(EnabledStatistics::None)
295            .build();
296
297        // TODO: Consider using an lgalloc buffer here instead of a vec
298        let writer = ArrowWriter::try_new(Vec::new(), builder.schema().into(), Some(props))?;
299        let uploader = S3MultiPartUploader::try_new(
300            sdk_config.as_ref(),
301            bucket,
302            key,
303            S3MultiPartUploaderConfig {
304                part_size_limit,
305                // We are already enforcing the max size ourselves so we set the max size enforced
306                // by the uploader to the max file size it will allow based on the part size limit.
307                // This is known to be greater than the `MAX_S3_SINK_FILE_SIZE` enforced during
308                // sink creation.
309                file_size_limit: part_size_limit
310                    .checked_mul(AWS_S3_MAX_PART_COUNT.try_into().expect("known safe"))
311                    .expect("known safe"),
312            },
313        )
314        .await?;
315
316        Ok(Self {
317            writer,
318            uploader,
319            arrow_builder_buffer_bytes,
320            row_group_size,
321            desc,
322            builder,
323        })
324    }
325
326    fn add_row(&mut self, row: &Row) -> Result<(), anyhow::Error> {
327        self.builder.add_row(row)?;
328
329        if u64::cast_from(self.builder.row_size_bytes()) > self.arrow_builder_buffer_bytes {
330            self.flush_builder()?;
331        }
332
333        Ok(())
334    }
335
336    /// Flush the current arrow builder, the parquet writer, and the uploader.
337    async fn finish(mut self) -> Result<CompletedUpload, anyhow::Error> {
338        self.flush_builder()?;
339        let buffer = self.writer.into_inner()?;
340        self.uploader.buffer_chunk(buffer.as_slice())?;
341        let res = self.uploader.finish().await?;
342        info!(
343            "finished upload: bucket {}, key {}, bytes_uploaded {}, parts_uploaded {}",
344            res.bucket, res.key, res.total_bytes_uploaded, res.part_count
345        );
346        Ok(res)
347    }
348
349    /// Flush the current arrow builder to the parquet writer then flushes the writer's buffer to
350    /// the uploader which may trigger an upload.
351    fn flush_builder(&mut self) -> Result<(), anyhow::Error> {
352        let builder = std::mem::replace(
353            &mut self.builder,
354            ArrowBuilder::new(
355                &self.desc,
356                DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
357                DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
358            )?,
359        );
360        let arrow_batch = builder.to_record_batch()?;
361
362        if arrow_batch.num_rows() == 0 {
363            return Ok(());
364        }
365
366        let before_groups = self.writer.flushed_row_groups().len();
367        self.writer.write(&arrow_batch)?;
368
369        // The writer will flush its buffer to a new parquet row group based on the row count,
370        // not the actual size of the data. We flush manually to allow uploading the data in
371        // potentially smaller chunks.
372        if u64::cast_from(self.writer.in_progress_size()) > self.row_group_size {
373            self.writer.flush()?;
374        }
375
376        // If the writer has flushed a new row group we can steal its buffer and upload it.
377        if self.writer.flushed_row_groups().len() > before_groups {
378            let buffer = self.writer.inner_mut();
379            self.uploader.buffer_chunk(buffer.as_slice())?;
380            // reuse the buffer in the writer
381            buffer.clear();
382        }
383        Ok(())
384    }
385
386    /// Returns an approximate size estimate of the file being written.
387    fn size_estimate(&self) -> u64 {
388        // ArrowWriter.in_progress_size() is just an estimate since it doesn't seem
389        // to account for data page compression and metadata that will be written for the next
390        // row-group.
391        u64::cast_from(self.writer.in_progress_size()) + self.uploader.added_bytes()
392    }
393}