mz_storage_operators/oneshot_source/
parquet.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Parquet [`OneshotFormat`].
11
12use std::fmt;
13use std::sync::Arc;
14
15use arrow::array::{Array, RecordBatch, StructArray};
16use bytes::{Bytes, BytesMut};
17use futures::future::BoxFuture;
18use futures::stream::BoxStream;
19use futures::{StreamExt, TryStreamExt};
20use mz_ore::cast::CastFrom;
21use mz_persist_types::arrow::ProtoArrayData;
22use mz_proto::{ProtoType, RustType};
23use mz_repr::RelationDesc;
24use parquet::arrow::ParquetRecordBatchStreamBuilder;
25use parquet::arrow::async_reader::{AsyncFileReader, MetadataFetch};
26use parquet::errors::ParquetError;
27use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
28use prost::Message;
29use serde::de::Visitor;
30use serde::{Deserialize, Deserializer, Serialize};
31use smallvec::{SmallVec, smallvec};
32
33use crate::oneshot_source::{
34    OneshotFormat, OneshotObject, OneshotSource, StorageErrorX, StorageErrorXContext,
35    StorageErrorXKind,
36};
37
38#[derive(Debug, Clone)]
39pub struct ParquetFormat {
40    desc: RelationDesc,
41}
42
43impl ParquetFormat {
44    pub fn new(desc: RelationDesc) -> Self {
45        ParquetFormat { desc }
46    }
47}
48
49#[derive(Clone, Debug, Serialize, Deserialize)]
50pub struct ParquetWorkRequest<O, C> {
51    object: O,
52    checksum: C,
53    row_groups: SmallVec<[usize; 1]>,
54}
55
56#[derive(Clone, Debug)]
57pub struct ParquetRowGroup {
58    record_batch: RecordBatch,
59}
60
61impl OneshotFormat for ParquetFormat {
62    type WorkRequest<S>
63        = ParquetWorkRequest<S::Object, S::Checksum>
64    where
65        S: OneshotSource;
66    type RecordChunk = ParquetRowGroup;
67
68    async fn split_work<S: OneshotSource + Send>(
69        &self,
70        source: S,
71        object: S::Object,
72        checksum: S::Checksum,
73    ) -> Result<Vec<Self::WorkRequest<S>>, StorageErrorX> {
74        let mut adapter = ParquetReaderAdapter::new(source, object.clone(), checksum.clone());
75        let parquet_metadata = adapter.get_metadata(None).await?;
76
77        tracing::info!(
78            object = object.name(),
79            row_groups = parquet_metadata.num_row_groups(),
80            "splitting Parquet object"
81        );
82
83        // Split up the file by the number of RowGroups.
84        //
85        // TODO(cf3): Support splitting up large RowGroups.
86        let work = (0..parquet_metadata.num_row_groups())
87            .map(|row_group| ParquetWorkRequest {
88                object: object.clone(),
89                checksum: checksum.clone(),
90                row_groups: smallvec![row_group],
91            })
92            .collect();
93
94        Ok(work)
95    }
96
97    fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
98        &'a self,
99        source: &'a S,
100        request: Self::WorkRequest<S>,
101    ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>> {
102        let ParquetWorkRequest {
103            object,
104            checksum,
105            row_groups,
106        } = request;
107
108        let adapter = ParquetReaderAdapter::new(source.clone(), object.clone(), checksum.clone());
109
110        let initial_work = async move {
111            ParquetRecordBatchStreamBuilder::new(adapter)
112                .await?
113                .with_row_groups(row_groups.to_vec())
114                .build()
115        };
116
117        futures::stream::once(initial_work)
118            .try_flatten()
119            .map_ok(|record_batch| ParquetRowGroup { record_batch })
120            .err_into()
121            .boxed()
122    }
123
124    fn decode_chunk(
125        &self,
126        chunk: Self::RecordChunk,
127        rows: &mut Vec<mz_repr::Row>,
128    ) -> Result<usize, StorageErrorX> {
129        let ParquetRowGroup { record_batch } = chunk;
130
131        let struct_array = StructArray::from(record_batch);
132        let reader = mz_arrow_util::reader::ArrowReader::new(&self.desc, struct_array)
133            .map_err(|err| StorageErrorXKind::ParquetError(err.to_string().into()))
134            .context("reader")?;
135        let rows_read = reader
136            .read_all(rows)
137            .map_err(|err| StorageErrorXKind::ParquetError(err.to_string().into()))
138            .context("read_all")?;
139
140        Ok(rows_read)
141    }
142}
143
144/// A newtype wrapper around a [`OneshotSource`] that allows us to implement
145/// [`AsyncFileReader`] and [`MetadataFetch`] for all types that implement
146/// [`OneshotSource`].
147#[derive(Clone)]
148struct ParquetReaderAdapter<S: OneshotSource> {
149    source: S,
150    object: S::Object,
151    checksum: S::Checksum,
152}
153
154impl<S: OneshotSource> fmt::Debug for ParquetReaderAdapter<S> {
155    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
156        f.debug_struct("ObjectStoreAdapter").finish()
157    }
158}
159
160impl<S: OneshotSource> ParquetReaderAdapter<S> {
161    fn new(source: S, object: S::Object, checksum: S::Checksum) -> Self {
162        ParquetReaderAdapter {
163            source,
164            object,
165            checksum,
166        }
167    }
168}
169
170impl<S: OneshotSource> MetadataFetch for ParquetReaderAdapter<S> {
171    fn fetch(
172        &mut self,
173        range: std::ops::Range<u64>,
174    ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
175        let inclusive_end = std::cmp::max(range.start, range.end.saturating_sub(1));
176
177        Box::pin(async move {
178            let range_start = usize::cast_from(range.start);
179            let inclusive_end = usize::cast_from(inclusive_end);
180            // Fetch the specified range.
181            let result: Result<Vec<_>, _> = self
182                .source
183                .get(
184                    self.object.clone(),
185                    self.checksum.clone(),
186                    Some(range_start..=inclusive_end),
187                )
188                .try_collect()
189                .await;
190            let bytes = match result {
191                Err(e) => return Err(ParquetError::General(e.to_string())),
192                Ok(bytes) => bytes,
193            };
194
195            // Join the stream into a single chunk.
196            let total_length = inclusive_end.saturating_sub(range_start);
197            let mut joined_bytes = BytesMut::with_capacity(total_length);
198            joined_bytes.extend(bytes);
199
200            Ok(joined_bytes.freeze())
201        })
202    }
203}
204
205impl<S: OneshotSource> AsyncFileReader for ParquetReaderAdapter<S> {
206    fn get_bytes(
207        &mut self,
208        range: std::ops::Range<u64>,
209    ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
210        MetadataFetch::fetch(self, range)
211    }
212
213    fn get_metadata<'a>(
214        &'a mut self,
215        _options: Option<&'a parquet::arrow::arrow_reader::ArrowReaderOptions>,
216    ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
217        Box::pin(async move {
218            let mut reader = ParquetMetaDataReader::new();
219            let object_size = u64::cast_from(self.object.size());
220            reader.try_load(self, object_size).await?;
221            reader.finish().map(Arc::new)
222        })
223    }
224}
225
226// Note(parkmycar): Instead of a manual implementation of Serialize and Deserialize we could
227// change `ParquetRowGroup` to have a type which we can derive the impl for. But no types from the
228// `arrow` crate do, and we'd prefer not to use a Vec<u8> since serialization is only required when
229// Timely workers span multiple processes.
230impl Serialize for ParquetRowGroup {
231    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
232    where
233        S: serde::Serializer,
234    {
235        // Note: This implementation isn't very efficient, but it should only be rarely used so
236        // it's not too much of a concern.
237        let struct_array = StructArray::from(self.record_batch.clone());
238        let proto_array: ProtoArrayData = struct_array.into_data().into_proto();
239        let encoded_proto = proto_array.encode_to_vec();
240        encoded_proto.serialize(serializer)
241    }
242}
243
244impl<'de> Deserialize<'de> for ParquetRowGroup {
245    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
246    where
247        D: serde::Deserializer<'de>,
248    {
249        fn struct_array<'de: 'a, 'a, D: Deserializer<'de>>(
250            deserializer: D,
251        ) -> Result<StructArray, D::Error> {
252            struct StructArrayVisitor;
253
254            impl<'a> Visitor<'a> for StructArrayVisitor {
255                type Value = StructArray;
256
257                fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
258                    formatter.write_str("binary data")
259                }
260
261                fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
262                where
263                    E: serde::de::Error,
264                {
265                    let serde_err =
266                        || serde::de::Error::invalid_value(serde::de::Unexpected::Bytes(v), &self);
267
268                    let array_data = ProtoArrayData::decode(v)
269                        .map_err(|_| serde_err())
270                        .and_then(|proto_array| proto_array.into_rust().map_err(|_| serde_err()))?;
271                    let array_ref = arrow::array::make_array(array_data);
272                    let struct_array = array_ref
273                        .as_any()
274                        .downcast_ref::<StructArray>()
275                        .ok_or_else(serde_err)?;
276
277                    Ok(struct_array.clone())
278                }
279            }
280
281            deserializer.deserialize_bytes(StructArrayVisitor)
282        }
283
284        let struct_array = struct_array(deserializer)?;
285        let record_batch = RecordBatch::from(struct_array);
286
287        Ok(ParquetRowGroup { record_batch })
288    }
289}