Skip to main content

mz_storage_operators/oneshot_source/
parquet.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Parquet [`OneshotFormat`].
11
12use std::fmt;
13use std::sync::Arc;
14
15use arrow::array::{RecordBatch, StructArray};
16use arrow_ipc::writer::StreamWriter;
17use bytes::{Bytes, BytesMut};
18use futures::future::BoxFuture;
19use futures::stream::BoxStream;
20use futures::{StreamExt, TryStreamExt};
21use mz_ore::cast::CastFrom;
22use mz_ore::collections::CollectionExt;
23use mz_repr::RelationDesc;
24use parquet::arrow::ParquetRecordBatchStreamBuilder;
25use parquet::arrow::async_reader::{AsyncFileReader, MetadataFetch};
26use parquet::errors::ParquetError;
27use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
28use serde::{Deserialize, Serialize};
29use smallvec::{SmallVec, smallvec};
30
31use crate::oneshot_source::{
32    OneshotFormat, OneshotObject, OneshotSource, StorageErrorX, StorageErrorXContext,
33    StorageErrorXKind,
34};
35
36#[derive(Debug, Clone)]
37pub struct ParquetFormat {
38    desc: RelationDesc,
39}
40
41impl ParquetFormat {
42    pub fn new(desc: RelationDesc) -> Self {
43        ParquetFormat { desc }
44    }
45}
46
47#[derive(Clone, Debug, Serialize, Deserialize)]
48pub struct ParquetWorkRequest<O, C> {
49    object: O,
50    checksum: C,
51    row_groups: SmallVec<[usize; 1]>,
52}
53
54#[derive(Clone, Debug)]
55pub struct ParquetRowGroup {
56    record_batch: RecordBatch,
57}
58
59impl OneshotFormat for ParquetFormat {
60    type WorkRequest<S>
61        = ParquetWorkRequest<S::Object, S::Checksum>
62    where
63        S: OneshotSource;
64    type RecordChunk = ParquetRowGroup;
65
66    async fn split_work<S: OneshotSource + Send>(
67        &self,
68        source: S,
69        object: S::Object,
70        checksum: S::Checksum,
71    ) -> Result<Vec<Self::WorkRequest<S>>, StorageErrorX> {
72        let mut adapter = ParquetReaderAdapter::new(source, object.clone(), checksum.clone());
73        let parquet_metadata = adapter.get_metadata(None).await?;
74
75        tracing::info!(
76            object = object.name(),
77            schema = ?parquet_metadata.file_metadata().schema_descr(),
78            row_groups = parquet_metadata.num_row_groups(),
79            "splitting Parquet object"
80        );
81
82        // Split up the file by the number of RowGroups.
83        //
84        // TODO(cf3): Support splitting up large RowGroups.
85        let work = (0..parquet_metadata.num_row_groups())
86            .map(|row_group| ParquetWorkRequest {
87                object: object.clone(),
88                checksum: checksum.clone(),
89                row_groups: smallvec![row_group],
90            })
91            .collect();
92
93        Ok(work)
94    }
95
96    fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
97        &'a self,
98        source: &'a S,
99        request: Self::WorkRequest<S>,
100    ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>> {
101        let ParquetWorkRequest {
102            object,
103            checksum,
104            row_groups,
105        } = request;
106
107        let adapter = ParquetReaderAdapter::new(source.clone(), object.clone(), checksum.clone());
108
109        let initial_work = async move {
110            ParquetRecordBatchStreamBuilder::new(adapter)
111                .await?
112                .with_row_groups(row_groups.to_vec())
113                .build()
114        };
115
116        futures::stream::once(initial_work)
117            .try_flatten()
118            .map_ok(|record_batch| ParquetRowGroup { record_batch })
119            .err_into()
120            .boxed()
121    }
122
123    fn decode_chunk(
124        &self,
125        chunk: Self::RecordChunk,
126        rows: &mut Vec<mz_repr::Row>,
127    ) -> Result<usize, StorageErrorX> {
128        let ParquetRowGroup { record_batch } = chunk;
129
130        let struct_array = StructArray::from(record_batch);
131        let reader = mz_arrow_util::reader::ArrowReader::new(&self.desc, struct_array)
132            .inspect_err(|err| tracing::warn!("error: {err:#?}"))
133            .map_err(|err| StorageErrorXKind::ParquetError(err.to_string().into()))
134            .context("reader")?;
135        let rows_read = reader
136            .read_all(rows)
137            .inspect_err(|err| tracing::warn!("error: {err:#?}"))
138            .map_err(|err| StorageErrorXKind::ParquetError(err.to_string().into()))
139            .context("read_all")?;
140
141        Ok(rows_read)
142    }
143}
144
145/// A newtype wrapper around a [`OneshotSource`] that allows us to implement
146/// [`AsyncFileReader`] and [`MetadataFetch`] for all types that implement
147/// [`OneshotSource`].
148#[derive(Clone)]
149struct ParquetReaderAdapter<S: OneshotSource> {
150    source: S,
151    object: S::Object,
152    checksum: S::Checksum,
153}
154
155impl<S: OneshotSource> fmt::Debug for ParquetReaderAdapter<S> {
156    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157        f.debug_struct("ObjectStoreAdapter").finish()
158    }
159}
160
161impl<S: OneshotSource> ParquetReaderAdapter<S> {
162    fn new(source: S, object: S::Object, checksum: S::Checksum) -> Self {
163        ParquetReaderAdapter {
164            source,
165            object,
166            checksum,
167        }
168    }
169}
170
171impl<S: OneshotSource> MetadataFetch for ParquetReaderAdapter<S> {
172    fn fetch(
173        &mut self,
174        range: std::ops::Range<u64>,
175    ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
176        let inclusive_end = std::cmp::max(range.start, range.end.saturating_sub(1));
177
178        Box::pin(async move {
179            let range_start = usize::cast_from(range.start);
180            let inclusive_end = usize::cast_from(inclusive_end);
181            // Fetch the specified range.
182            let result: Result<Vec<_>, _> = self
183                .source
184                .get(
185                    self.object.clone(),
186                    self.checksum.clone(),
187                    Some(range_start..=inclusive_end),
188                )
189                .try_collect()
190                .await;
191            let bytes = match result {
192                Err(e) => return Err(ParquetError::General(e.to_string())),
193                Ok(bytes) => bytes,
194            };
195
196            // Join the stream into a single chunk.
197            let total_length = inclusive_end.saturating_sub(range_start);
198            let mut joined_bytes = BytesMut::with_capacity(total_length);
199            joined_bytes.extend(bytes);
200
201            Ok(joined_bytes.freeze())
202        })
203    }
204}
205
206impl<S: OneshotSource> AsyncFileReader for ParquetReaderAdapter<S> {
207    fn get_bytes(
208        &mut self,
209        range: std::ops::Range<u64>,
210    ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
211        MetadataFetch::fetch(self, range)
212    }
213
214    fn get_metadata<'a>(
215        &'a mut self,
216        _options: Option<&'a parquet::arrow::arrow_reader::ArrowReaderOptions>,
217    ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
218        Box::pin(async move {
219            let mut reader = ParquetMetaDataReader::new();
220            let object_size = u64::cast_from(self.object.size());
221            reader.try_load(self, object_size).await?;
222            reader.finish().map(Arc::new)
223        })
224    }
225}
226
227// Note(parkmycar): Instead of a manual implementation of Serialize and Deserialize we could
228// change `ParquetRowGroup` to have a type which we can derive the impl for. But no types from the
229// `arrow` crate do, and we'd prefer not to use a Vec<u8> since serialization is only required when
230// Timely workers span multiple processes.
231impl Serialize for ParquetRowGroup {
232    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
233    where
234        S: serde::Serializer,
235    {
236        // Note: This implementation isn't very efficient, but it should only be rarely used so
237        // it's not too much of a concern.
238        let mut buf = Vec::new();
239        let mut writer = StreamWriter::try_new(&mut buf, &self.record_batch.schema())
240            .map_err(serde::ser::Error::custom)?;
241        writer
242            .write(&self.record_batch)
243            .map_err(serde::ser::Error::custom)?;
244        writer.finish().map_err(serde::ser::Error::custom)?;
245        buf.serialize(serializer)
246    }
247}
248
249impl<'de> Deserialize<'de> for ParquetRowGroup {
250    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
251    where
252        D: serde::Deserializer<'de>,
253    {
254        let bytes = <Vec<u8>>::deserialize(deserializer)?;
255        let reader = arrow_ipc::reader::StreamReader::try_new(bytes.as_slice(), None)
256            .map_err(serde::de::Error::custom)?;
257
258        let record_batch = reader
259            .expect_element(|| "expected exactly one record batch in IPC stream")
260            .map_err(serde::de::Error::custom)?;
261
262        Ok(ParquetRowGroup { record_batch })
263    }
264}