mz_storage_operators/oneshot_source/
parquet.rs
1use std::fmt;
13use std::sync::Arc;
14
15use arrow::array::{Array, RecordBatch, StructArray};
16use bytes::{Bytes, BytesMut};
17use futures::future::BoxFuture;
18use futures::stream::BoxStream;
19use futures::{StreamExt, TryStreamExt};
20use mz_persist_types::arrow::ProtoArrayData;
21use mz_proto::{ProtoType, RustType};
22use mz_repr::RelationDesc;
23use parquet::arrow::ParquetRecordBatchStreamBuilder;
24use parquet::arrow::async_reader::{AsyncFileReader, MetadataFetch};
25use parquet::errors::ParquetError;
26use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
27use prost::Message;
28use serde::de::Visitor;
29use serde::{Deserialize, Deserializer, Serialize};
30use smallvec::{SmallVec, smallvec};
31
32use crate::oneshot_source::{
33 OneshotFormat, OneshotObject, OneshotSource, StorageErrorX, StorageErrorXContext,
34 StorageErrorXKind,
35};
36
37#[derive(Debug, Clone)]
38pub struct ParquetFormat {
39 desc: RelationDesc,
40}
41
42impl ParquetFormat {
43 pub fn new(desc: RelationDesc) -> Self {
44 ParquetFormat { desc }
45 }
46}
47
48#[derive(Clone, Debug, Serialize, Deserialize)]
49pub struct ParquetWorkRequest<O, C> {
50 object: O,
51 checksum: C,
52 row_groups: SmallVec<[usize; 1]>,
53}
54
55#[derive(Clone, Debug)]
56pub struct ParquetRowGroup {
57 record_batch: RecordBatch,
58}
59
60impl OneshotFormat for ParquetFormat {
61 type WorkRequest<S>
62 = ParquetWorkRequest<S::Object, S::Checksum>
63 where
64 S: OneshotSource;
65 type RecordChunk = ParquetRowGroup;
66
67 async fn split_work<S: OneshotSource + Send>(
68 &self,
69 source: S,
70 object: S::Object,
71 checksum: S::Checksum,
72 ) -> Result<Vec<Self::WorkRequest<S>>, StorageErrorX> {
73 let mut adapter = ParquetReaderAdapter::new(source, object.clone(), checksum.clone());
74 let parquet_metadata = adapter.get_metadata().await?;
75
76 tracing::info!(
77 object = object.name(),
78 row_groups = parquet_metadata.num_row_groups(),
79 "splitting Parquet object"
80 );
81
82 let work = (0..parquet_metadata.num_row_groups())
86 .map(|row_group| ParquetWorkRequest {
87 object: object.clone(),
88 checksum: checksum.clone(),
89 row_groups: smallvec![row_group],
90 })
91 .collect();
92
93 Ok(work)
94 }
95
96 fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
97 &'a self,
98 source: &'a S,
99 request: Self::WorkRequest<S>,
100 ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>> {
101 let ParquetWorkRequest {
102 object,
103 checksum,
104 row_groups,
105 } = request;
106
107 let adapter = ParquetReaderAdapter::new(source.clone(), object.clone(), checksum.clone());
108
109 let initial_work = async move {
110 ParquetRecordBatchStreamBuilder::new(adapter)
111 .await?
112 .with_row_groups(row_groups.to_vec())
113 .build()
114 };
115
116 futures::stream::once(initial_work)
117 .try_flatten()
118 .map_ok(|record_batch| ParquetRowGroup { record_batch })
119 .err_into()
120 .boxed()
121 }
122
123 fn decode_chunk(
124 &self,
125 chunk: Self::RecordChunk,
126 rows: &mut Vec<mz_repr::Row>,
127 ) -> Result<usize, StorageErrorX> {
128 let ParquetRowGroup { record_batch } = chunk;
129
130 let struct_array = StructArray::from(record_batch);
131 let reader = mz_arrow_util::reader::ArrowReader::new(&self.desc, struct_array)
132 .map_err(|err| StorageErrorXKind::ParquetError(err.to_string().into()))
133 .context("reader")?;
134 let rows_read = reader
135 .read_all(rows)
136 .map_err(|err| StorageErrorXKind::ParquetError(err.to_string().into()))
137 .context("read_all")?;
138
139 Ok(rows_read)
140 }
141}
142
143#[derive(Clone)]
147struct ParquetReaderAdapter<S: OneshotSource> {
148 source: S,
149 object: S::Object,
150 checksum: S::Checksum,
151}
152
153impl<S: OneshotSource> fmt::Debug for ParquetReaderAdapter<S> {
154 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155 f.debug_struct("ObjectStoreAdapter").finish()
156 }
157}
158
159impl<S: OneshotSource> ParquetReaderAdapter<S> {
160 fn new(source: S, object: S::Object, checksum: S::Checksum) -> Self {
161 ParquetReaderAdapter {
162 source,
163 object,
164 checksum,
165 }
166 }
167}
168
169impl<S: OneshotSource> MetadataFetch for ParquetReaderAdapter<S> {
170 fn fetch(
171 &mut self,
172 range: std::ops::Range<usize>,
173 ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
174 let inclusive_end = std::cmp::max(range.start, range.end.saturating_sub(1));
175
176 Box::pin(async move {
177 let result: Result<Vec<_>, _> = self
179 .source
180 .get(
181 self.object.clone(),
182 self.checksum.clone(),
183 Some(range.start..=inclusive_end),
184 )
185 .try_collect()
186 .await;
187 let bytes = match result {
188 Err(e) => return Err(ParquetError::General(e.to_string())),
189 Ok(bytes) => bytes,
190 };
191
192 let total_length = inclusive_end.saturating_sub(range.start);
194 let mut joined_bytes = BytesMut::with_capacity(total_length);
195 joined_bytes.extend(bytes);
196
197 Ok(joined_bytes.freeze())
198 })
199 }
200}
201
202impl<S: OneshotSource> AsyncFileReader for ParquetReaderAdapter<S> {
203 fn get_bytes(
204 &mut self,
205 range: std::ops::Range<usize>,
206 ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
207 MetadataFetch::fetch(self, range)
208 }
209
210 fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
211 Box::pin(async move {
212 let mut reader = ParquetMetaDataReader::new();
213 let object_size = self.object.size();
214 reader.try_load(self, object_size).await?;
215 reader.finish().map(Arc::new)
216 })
217 }
218}
219
220impl Serialize for ParquetRowGroup {
225 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
226 where
227 S: serde::Serializer,
228 {
229 let struct_array = StructArray::from(self.record_batch.clone());
232 let proto_array: ProtoArrayData = struct_array.into_data().into_proto();
233 let encoded_proto = proto_array.encode_to_vec();
234 encoded_proto.serialize(serializer)
235 }
236}
237
238impl<'de> Deserialize<'de> for ParquetRowGroup {
239 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
240 where
241 D: serde::Deserializer<'de>,
242 {
243 fn struct_array<'de: 'a, 'a, D: Deserializer<'de>>(
244 deserializer: D,
245 ) -> Result<StructArray, D::Error> {
246 struct StructArrayVisitor;
247
248 impl<'a> Visitor<'a> for StructArrayVisitor {
249 type Value = StructArray;
250
251 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
252 formatter.write_str("binary data")
253 }
254
255 fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
256 where
257 E: serde::de::Error,
258 {
259 let serde_err =
260 || serde::de::Error::invalid_value(serde::de::Unexpected::Bytes(v), &self);
261
262 let array_data = ProtoArrayData::decode(v)
263 .map_err(|_| serde_err())
264 .and_then(|proto_array| proto_array.into_rust().map_err(|_| serde_err()))?;
265 let array_ref = arrow::array::make_array(array_data);
266 let struct_array = array_ref
267 .as_any()
268 .downcast_ref::<StructArray>()
269 .ok_or_else(serde_err)?;
270
271 Ok(struct_array.clone())
272 }
273 }
274
275 deserializer.deserialize_bytes(StructArrayVisitor)
276 }
277
278 let struct_array = struct_array(deserializer)?;
279 let record_batch = RecordBatch::from(struct_array);
280
281 Ok(ParquetRowGroup { record_batch })
282 }
283}