mz_storage_operators/oneshot_source/
parquet.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Parquet [`OneshotFormat`].
11
12use std::fmt;
13use std::sync::Arc;
14
15use arrow::array::{Array, RecordBatch, StructArray};
16use bytes::{Bytes, BytesMut};
17use futures::future::BoxFuture;
18use futures::stream::BoxStream;
19use futures::{StreamExt, TryStreamExt};
20use mz_persist_types::arrow::ProtoArrayData;
21use mz_proto::{ProtoType, RustType};
22use mz_repr::RelationDesc;
23use parquet::arrow::ParquetRecordBatchStreamBuilder;
24use parquet::arrow::async_reader::{AsyncFileReader, MetadataFetch};
25use parquet::errors::ParquetError;
26use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
27use prost::Message;
28use serde::de::Visitor;
29use serde::{Deserialize, Deserializer, Serialize};
30use smallvec::{SmallVec, smallvec};
31
32use crate::oneshot_source::{
33    OneshotFormat, OneshotObject, OneshotSource, StorageErrorX, StorageErrorXContext,
34    StorageErrorXKind,
35};
36
37#[derive(Debug, Clone)]
38pub struct ParquetFormat {
39    desc: RelationDesc,
40}
41
42impl ParquetFormat {
43    pub fn new(desc: RelationDesc) -> Self {
44        ParquetFormat { desc }
45    }
46}
47
48#[derive(Clone, Debug, Serialize, Deserialize)]
49pub struct ParquetWorkRequest<O, C> {
50    object: O,
51    checksum: C,
52    row_groups: SmallVec<[usize; 1]>,
53}
54
55#[derive(Clone, Debug)]
56pub struct ParquetRowGroup {
57    record_batch: RecordBatch,
58}
59
60impl OneshotFormat for ParquetFormat {
61    type WorkRequest<S>
62        = ParquetWorkRequest<S::Object, S::Checksum>
63    where
64        S: OneshotSource;
65    type RecordChunk = ParquetRowGroup;
66
67    async fn split_work<S: OneshotSource + Send>(
68        &self,
69        source: S,
70        object: S::Object,
71        checksum: S::Checksum,
72    ) -> Result<Vec<Self::WorkRequest<S>>, StorageErrorX> {
73        let mut adapter = ParquetReaderAdapter::new(source, object.clone(), checksum.clone());
74        let parquet_metadata = adapter.get_metadata().await?;
75
76        tracing::info!(
77            object = object.name(),
78            row_groups = parquet_metadata.num_row_groups(),
79            "splitting Parquet object"
80        );
81
82        // Split up the file by the number of RowGroups.
83        //
84        // TODO(cf3): Support splitting up large RowGroups.
85        let work = (0..parquet_metadata.num_row_groups())
86            .map(|row_group| ParquetWorkRequest {
87                object: object.clone(),
88                checksum: checksum.clone(),
89                row_groups: smallvec![row_group],
90            })
91            .collect();
92
93        Ok(work)
94    }
95
96    fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
97        &'a self,
98        source: &'a S,
99        request: Self::WorkRequest<S>,
100    ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>> {
101        let ParquetWorkRequest {
102            object,
103            checksum,
104            row_groups,
105        } = request;
106
107        let adapter = ParquetReaderAdapter::new(source.clone(), object.clone(), checksum.clone());
108
109        let initial_work = async move {
110            ParquetRecordBatchStreamBuilder::new(adapter)
111                .await?
112                .with_row_groups(row_groups.to_vec())
113                .build()
114        };
115
116        futures::stream::once(initial_work)
117            .try_flatten()
118            .map_ok(|record_batch| ParquetRowGroup { record_batch })
119            .err_into()
120            .boxed()
121    }
122
123    fn decode_chunk(
124        &self,
125        chunk: Self::RecordChunk,
126        rows: &mut Vec<mz_repr::Row>,
127    ) -> Result<usize, StorageErrorX> {
128        let ParquetRowGroup { record_batch } = chunk;
129
130        let struct_array = StructArray::from(record_batch);
131        let reader = mz_arrow_util::reader::ArrowReader::new(&self.desc, struct_array)
132            .map_err(|err| StorageErrorXKind::ParquetError(err.to_string().into()))
133            .context("reader")?;
134        let rows_read = reader
135            .read_all(rows)
136            .map_err(|err| StorageErrorXKind::ParquetError(err.to_string().into()))
137            .context("read_all")?;
138
139        Ok(rows_read)
140    }
141}
142
143/// A newtype wrapper around a [`OneshotSource`] that allows us to implement
144/// [`AsyncFileReader`] and [`MetadataFetch`] for all types that implement
145/// [`OneshotSource`].
146#[derive(Clone)]
147struct ParquetReaderAdapter<S: OneshotSource> {
148    source: S,
149    object: S::Object,
150    checksum: S::Checksum,
151}
152
153impl<S: OneshotSource> fmt::Debug for ParquetReaderAdapter<S> {
154    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155        f.debug_struct("ObjectStoreAdapter").finish()
156    }
157}
158
159impl<S: OneshotSource> ParquetReaderAdapter<S> {
160    fn new(source: S, object: S::Object, checksum: S::Checksum) -> Self {
161        ParquetReaderAdapter {
162            source,
163            object,
164            checksum,
165        }
166    }
167}
168
169impl<S: OneshotSource> MetadataFetch for ParquetReaderAdapter<S> {
170    fn fetch(
171        &mut self,
172        range: std::ops::Range<usize>,
173    ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
174        let inclusive_end = std::cmp::max(range.start, range.end.saturating_sub(1));
175
176        Box::pin(async move {
177            // Fetch the specified range.
178            let result: Result<Vec<_>, _> = self
179                .source
180                .get(
181                    self.object.clone(),
182                    self.checksum.clone(),
183                    Some(range.start..=inclusive_end),
184                )
185                .try_collect()
186                .await;
187            let bytes = match result {
188                Err(e) => return Err(ParquetError::General(e.to_string())),
189                Ok(bytes) => bytes,
190            };
191
192            // Join the stream into a single chunk.
193            let total_length = inclusive_end.saturating_sub(range.start);
194            let mut joined_bytes = BytesMut::with_capacity(total_length);
195            joined_bytes.extend(bytes);
196
197            Ok(joined_bytes.freeze())
198        })
199    }
200}
201
202impl<S: OneshotSource> AsyncFileReader for ParquetReaderAdapter<S> {
203    fn get_bytes(
204        &mut self,
205        range: std::ops::Range<usize>,
206    ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
207        MetadataFetch::fetch(self, range)
208    }
209
210    fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
211        Box::pin(async move {
212            let mut reader = ParquetMetaDataReader::new();
213            let object_size = self.object.size();
214            reader.try_load(self, object_size).await?;
215            reader.finish().map(Arc::new)
216        })
217    }
218}
219
220// Note(parkmycar): Instead of a manual implementation of Serialize and Deserialize we could
221// change `ParquetRowGroup` to have a type which we can derive the impl for. But no types from the
222// `arrow` crate do, and we'd prefer not to use a Vec<u8> since serialization is only required when
223// Timely workers span multiple processes.
224impl Serialize for ParquetRowGroup {
225    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
226    where
227        S: serde::Serializer,
228    {
229        // Note: This implementation isn't very efficient, but it should only be rarely used so
230        // it's not too much of a concern.
231        let struct_array = StructArray::from(self.record_batch.clone());
232        let proto_array: ProtoArrayData = struct_array.into_data().into_proto();
233        let encoded_proto = proto_array.encode_to_vec();
234        encoded_proto.serialize(serializer)
235    }
236}
237
238impl<'de> Deserialize<'de> for ParquetRowGroup {
239    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
240    where
241        D: serde::Deserializer<'de>,
242    {
243        fn struct_array<'de: 'a, 'a, D: Deserializer<'de>>(
244            deserializer: D,
245        ) -> Result<StructArray, D::Error> {
246            struct StructArrayVisitor;
247
248            impl<'a> Visitor<'a> for StructArrayVisitor {
249                type Value = StructArray;
250
251                fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
252                    formatter.write_str("binary data")
253                }
254
255                fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
256                where
257                    E: serde::de::Error,
258                {
259                    let serde_err =
260                        || serde::de::Error::invalid_value(serde::de::Unexpected::Bytes(v), &self);
261
262                    let array_data = ProtoArrayData::decode(v)
263                        .map_err(|_| serde_err())
264                        .and_then(|proto_array| proto_array.into_rust().map_err(|_| serde_err()))?;
265                    let array_ref = arrow::array::make_array(array_data);
266                    let struct_array = array_ref
267                        .as_any()
268                        .downcast_ref::<StructArray>()
269                        .ok_or_else(serde_err)?;
270
271                    Ok(struct_array.clone())
272                }
273            }
274
275            deserializer.deserialize_bytes(StructArrayVisitor)
276        }
277
278        let struct_array = struct_array(deserializer)?;
279        let record_batch = RecordBatch::from(struct_array);
280
281        Ok(ParquetRowGroup { record_batch })
282    }
283}