mz_storage_operators/oneshot_source/
parquet.rs1use std::fmt;
13use std::sync::Arc;
14
15use arrow::array::{Array, RecordBatch, StructArray};
16use bytes::{Bytes, BytesMut};
17use futures::future::BoxFuture;
18use futures::stream::BoxStream;
19use futures::{StreamExt, TryStreamExt};
20use mz_ore::cast::CastFrom;
21use mz_persist_types::arrow::ProtoArrayData;
22use mz_proto::{ProtoType, RustType};
23use mz_repr::RelationDesc;
24use parquet::arrow::ParquetRecordBatchStreamBuilder;
25use parquet::arrow::async_reader::{AsyncFileReader, MetadataFetch};
26use parquet::errors::ParquetError;
27use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
28use prost::Message;
29use serde::de::Visitor;
30use serde::{Deserialize, Deserializer, Serialize};
31use smallvec::{SmallVec, smallvec};
32
33use crate::oneshot_source::{
34 OneshotFormat, OneshotObject, OneshotSource, StorageErrorX, StorageErrorXContext,
35 StorageErrorXKind,
36};
37
38#[derive(Debug, Clone)]
39pub struct ParquetFormat {
40 desc: RelationDesc,
41}
42
43impl ParquetFormat {
44 pub fn new(desc: RelationDesc) -> Self {
45 ParquetFormat { desc }
46 }
47}
48
49#[derive(Clone, Debug, Serialize, Deserialize)]
50pub struct ParquetWorkRequest<O, C> {
51 object: O,
52 checksum: C,
53 row_groups: SmallVec<[usize; 1]>,
54}
55
56#[derive(Clone, Debug)]
57pub struct ParquetRowGroup {
58 record_batch: RecordBatch,
59}
60
61impl OneshotFormat for ParquetFormat {
62 type WorkRequest<S>
63 = ParquetWorkRequest<S::Object, S::Checksum>
64 where
65 S: OneshotSource;
66 type RecordChunk = ParquetRowGroup;
67
68 async fn split_work<S: OneshotSource + Send>(
69 &self,
70 source: S,
71 object: S::Object,
72 checksum: S::Checksum,
73 ) -> Result<Vec<Self::WorkRequest<S>>, StorageErrorX> {
74 let mut adapter = ParquetReaderAdapter::new(source, object.clone(), checksum.clone());
75 let parquet_metadata = adapter.get_metadata(None).await?;
76
77 tracing::info!(
78 object = object.name(),
79 row_groups = parquet_metadata.num_row_groups(),
80 "splitting Parquet object"
81 );
82
83 let work = (0..parquet_metadata.num_row_groups())
87 .map(|row_group| ParquetWorkRequest {
88 object: object.clone(),
89 checksum: checksum.clone(),
90 row_groups: smallvec![row_group],
91 })
92 .collect();
93
94 Ok(work)
95 }
96
97 fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
98 &'a self,
99 source: &'a S,
100 request: Self::WorkRequest<S>,
101 ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>> {
102 let ParquetWorkRequest {
103 object,
104 checksum,
105 row_groups,
106 } = request;
107
108 let adapter = ParquetReaderAdapter::new(source.clone(), object.clone(), checksum.clone());
109
110 let initial_work = async move {
111 ParquetRecordBatchStreamBuilder::new(adapter)
112 .await?
113 .with_row_groups(row_groups.to_vec())
114 .build()
115 };
116
117 futures::stream::once(initial_work)
118 .try_flatten()
119 .map_ok(|record_batch| ParquetRowGroup { record_batch })
120 .err_into()
121 .boxed()
122 }
123
124 fn decode_chunk(
125 &self,
126 chunk: Self::RecordChunk,
127 rows: &mut Vec<mz_repr::Row>,
128 ) -> Result<usize, StorageErrorX> {
129 let ParquetRowGroup { record_batch } = chunk;
130
131 let struct_array = StructArray::from(record_batch);
132 let reader = mz_arrow_util::reader::ArrowReader::new(&self.desc, struct_array)
133 .map_err(|err| StorageErrorXKind::ParquetError(err.to_string().into()))
134 .context("reader")?;
135 let rows_read = reader
136 .read_all(rows)
137 .map_err(|err| StorageErrorXKind::ParquetError(err.to_string().into()))
138 .context("read_all")?;
139
140 Ok(rows_read)
141 }
142}
143
144#[derive(Clone)]
148struct ParquetReaderAdapter<S: OneshotSource> {
149 source: S,
150 object: S::Object,
151 checksum: S::Checksum,
152}
153
154impl<S: OneshotSource> fmt::Debug for ParquetReaderAdapter<S> {
155 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
156 f.debug_struct("ObjectStoreAdapter").finish()
157 }
158}
159
160impl<S: OneshotSource> ParquetReaderAdapter<S> {
161 fn new(source: S, object: S::Object, checksum: S::Checksum) -> Self {
162 ParquetReaderAdapter {
163 source,
164 object,
165 checksum,
166 }
167 }
168}
169
170impl<S: OneshotSource> MetadataFetch for ParquetReaderAdapter<S> {
171 fn fetch(
172 &mut self,
173 range: std::ops::Range<u64>,
174 ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
175 let inclusive_end = std::cmp::max(range.start, range.end.saturating_sub(1));
176
177 Box::pin(async move {
178 let range_start = usize::cast_from(range.start);
179 let inclusive_end = usize::cast_from(inclusive_end);
180 let result: Result<Vec<_>, _> = self
182 .source
183 .get(
184 self.object.clone(),
185 self.checksum.clone(),
186 Some(range_start..=inclusive_end),
187 )
188 .try_collect()
189 .await;
190 let bytes = match result {
191 Err(e) => return Err(ParquetError::General(e.to_string())),
192 Ok(bytes) => bytes,
193 };
194
195 let total_length = inclusive_end.saturating_sub(range_start);
197 let mut joined_bytes = BytesMut::with_capacity(total_length);
198 joined_bytes.extend(bytes);
199
200 Ok(joined_bytes.freeze())
201 })
202 }
203}
204
205impl<S: OneshotSource> AsyncFileReader for ParquetReaderAdapter<S> {
206 fn get_bytes(
207 &mut self,
208 range: std::ops::Range<u64>,
209 ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
210 MetadataFetch::fetch(self, range)
211 }
212
213 fn get_metadata<'a>(
214 &'a mut self,
215 _options: Option<&'a parquet::arrow::arrow_reader::ArrowReaderOptions>,
216 ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
217 Box::pin(async move {
218 let mut reader = ParquetMetaDataReader::new();
219 let object_size = u64::cast_from(self.object.size());
220 reader.try_load(self, object_size).await?;
221 reader.finish().map(Arc::new)
222 })
223 }
224}
225
226impl Serialize for ParquetRowGroup {
231 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
232 where
233 S: serde::Serializer,
234 {
235 let struct_array = StructArray::from(self.record_batch.clone());
238 let proto_array: ProtoArrayData = struct_array.into_data().into_proto();
239 let encoded_proto = proto_array.encode_to_vec();
240 encoded_proto.serialize(serializer)
241 }
242}
243
244impl<'de> Deserialize<'de> for ParquetRowGroup {
245 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
246 where
247 D: serde::Deserializer<'de>,
248 {
249 fn struct_array<'de: 'a, 'a, D: Deserializer<'de>>(
250 deserializer: D,
251 ) -> Result<StructArray, D::Error> {
252 struct StructArrayVisitor;
253
254 impl<'a> Visitor<'a> for StructArrayVisitor {
255 type Value = StructArray;
256
257 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
258 formatter.write_str("binary data")
259 }
260
261 fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
262 where
263 E: serde::de::Error,
264 {
265 let serde_err =
266 || serde::de::Error::invalid_value(serde::de::Unexpected::Bytes(v), &self);
267
268 let array_data = ProtoArrayData::decode(v)
269 .map_err(|_| serde_err())
270 .and_then(|proto_array| proto_array.into_rust().map_err(|_| serde_err()))?;
271 let array_ref = arrow::array::make_array(array_data);
272 let struct_array = array_ref
273 .as_any()
274 .downcast_ref::<StructArray>()
275 .ok_or_else(serde_err)?;
276
277 Ok(struct_array.clone())
278 }
279 }
280
281 deserializer.deserialize_bytes(StructArrayVisitor)
282 }
283
284 let struct_array = struct_array(deserializer)?;
285 let record_batch = RecordBatch::from(struct_array);
286
287 Ok(ParquetRowGroup { record_batch })
288 }
289}