mz_storage_operators/oneshot_source/
parquet.rs1use std::fmt;
13use std::sync::Arc;
14
15use arrow::array::{RecordBatch, StructArray};
16use arrow_ipc::writer::StreamWriter;
17use bytes::{Bytes, BytesMut};
18use futures::future::BoxFuture;
19use futures::stream::BoxStream;
20use futures::{StreamExt, TryStreamExt};
21use mz_ore::cast::CastFrom;
22use mz_ore::collections::CollectionExt;
23use mz_repr::RelationDesc;
24use parquet::arrow::ParquetRecordBatchStreamBuilder;
25use parquet::arrow::async_reader::{AsyncFileReader, MetadataFetch};
26use parquet::errors::ParquetError;
27use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
28use serde::{Deserialize, Serialize};
29use smallvec::{SmallVec, smallvec};
30
31use crate::oneshot_source::{
32 OneshotFormat, OneshotObject, OneshotSource, StorageErrorX, StorageErrorXContext,
33 StorageErrorXKind,
34};
35
36#[derive(Debug, Clone)]
37pub struct ParquetFormat {
38 desc: RelationDesc,
39}
40
41impl ParquetFormat {
42 pub fn new(desc: RelationDesc) -> Self {
43 ParquetFormat { desc }
44 }
45}
46
47#[derive(Clone, Debug, Serialize, Deserialize)]
48pub struct ParquetWorkRequest<O, C> {
49 object: O,
50 checksum: C,
51 row_groups: SmallVec<[usize; 1]>,
52}
53
54#[derive(Clone, Debug)]
55pub struct ParquetRowGroup {
56 record_batch: RecordBatch,
57}
58
59impl OneshotFormat for ParquetFormat {
60 type WorkRequest<S>
61 = ParquetWorkRequest<S::Object, S::Checksum>
62 where
63 S: OneshotSource;
64 type RecordChunk = ParquetRowGroup;
65
66 async fn split_work<S: OneshotSource + Send>(
67 &self,
68 source: S,
69 object: S::Object,
70 checksum: S::Checksum,
71 ) -> Result<Vec<Self::WorkRequest<S>>, StorageErrorX> {
72 let mut adapter = ParquetReaderAdapter::new(source, object.clone(), checksum.clone());
73 let parquet_metadata = adapter.get_metadata(None).await?;
74
75 tracing::info!(
76 object = object.name(),
77 schema = ?parquet_metadata.file_metadata().schema_descr(),
78 row_groups = parquet_metadata.num_row_groups(),
79 "splitting Parquet object"
80 );
81
82 let work = (0..parquet_metadata.num_row_groups())
86 .map(|row_group| ParquetWorkRequest {
87 object: object.clone(),
88 checksum: checksum.clone(),
89 row_groups: smallvec![row_group],
90 })
91 .collect();
92
93 Ok(work)
94 }
95
96 fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
97 &'a self,
98 source: &'a S,
99 request: Self::WorkRequest<S>,
100 ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>> {
101 let ParquetWorkRequest {
102 object,
103 checksum,
104 row_groups,
105 } = request;
106
107 let adapter = ParquetReaderAdapter::new(source.clone(), object.clone(), checksum.clone());
108
109 let initial_work = async move {
110 ParquetRecordBatchStreamBuilder::new(adapter)
111 .await?
112 .with_row_groups(row_groups.to_vec())
113 .build()
114 };
115
116 futures::stream::once(initial_work)
117 .try_flatten()
118 .map_ok(|record_batch| ParquetRowGroup { record_batch })
119 .err_into()
120 .boxed()
121 }
122
123 fn decode_chunk(
124 &self,
125 chunk: Self::RecordChunk,
126 rows: &mut Vec<mz_repr::Row>,
127 ) -> Result<usize, StorageErrorX> {
128 let ParquetRowGroup { record_batch } = chunk;
129
130 let struct_array = StructArray::from(record_batch);
131 let reader = mz_arrow_util::reader::ArrowReader::new(&self.desc, struct_array)
132 .inspect_err(|err| tracing::warn!("error: {err:#?}"))
133 .map_err(|err| StorageErrorXKind::ParquetError(err.to_string().into()))
134 .context("reader")?;
135 let rows_read = reader
136 .read_all(rows)
137 .inspect_err(|err| tracing::warn!("error: {err:#?}"))
138 .map_err(|err| StorageErrorXKind::ParquetError(err.to_string().into()))
139 .context("read_all")?;
140
141 Ok(rows_read)
142 }
143}
144
145#[derive(Clone)]
149struct ParquetReaderAdapter<S: OneshotSource> {
150 source: S,
151 object: S::Object,
152 checksum: S::Checksum,
153}
154
155impl<S: OneshotSource> fmt::Debug for ParquetReaderAdapter<S> {
156 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157 f.debug_struct("ObjectStoreAdapter").finish()
158 }
159}
160
161impl<S: OneshotSource> ParquetReaderAdapter<S> {
162 fn new(source: S, object: S::Object, checksum: S::Checksum) -> Self {
163 ParquetReaderAdapter {
164 source,
165 object,
166 checksum,
167 }
168 }
169}
170
171impl<S: OneshotSource> MetadataFetch for ParquetReaderAdapter<S> {
172 fn fetch(
173 &mut self,
174 range: std::ops::Range<u64>,
175 ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
176 let inclusive_end = std::cmp::max(range.start, range.end.saturating_sub(1));
177
178 Box::pin(async move {
179 let range_start = usize::cast_from(range.start);
180 let inclusive_end = usize::cast_from(inclusive_end);
181 let result: Result<Vec<_>, _> = self
183 .source
184 .get(
185 self.object.clone(),
186 self.checksum.clone(),
187 Some(range_start..=inclusive_end),
188 )
189 .try_collect()
190 .await;
191 let bytes = match result {
192 Err(e) => return Err(ParquetError::General(e.to_string())),
193 Ok(bytes) => bytes,
194 };
195
196 let total_length = inclusive_end.saturating_sub(range_start);
198 let mut joined_bytes = BytesMut::with_capacity(total_length);
199 joined_bytes.extend(bytes);
200
201 Ok(joined_bytes.freeze())
202 })
203 }
204}
205
206impl<S: OneshotSource> AsyncFileReader for ParquetReaderAdapter<S> {
207 fn get_bytes(
208 &mut self,
209 range: std::ops::Range<u64>,
210 ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
211 MetadataFetch::fetch(self, range)
212 }
213
214 fn get_metadata<'a>(
215 &'a mut self,
216 _options: Option<&'a parquet::arrow::arrow_reader::ArrowReaderOptions>,
217 ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
218 Box::pin(async move {
219 let mut reader = ParquetMetaDataReader::new();
220 let object_size = u64::cast_from(self.object.size());
221 reader.try_load(self, object_size).await?;
222 reader.finish().map(Arc::new)
223 })
224 }
225}
226
227impl Serialize for ParquetRowGroup {
232 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
233 where
234 S: serde::Serializer,
235 {
236 let mut buf = Vec::new();
239 let mut writer = StreamWriter::try_new(&mut buf, &self.record_batch.schema())
240 .map_err(serde::ser::Error::custom)?;
241 writer
242 .write(&self.record_batch)
243 .map_err(serde::ser::Error::custom)?;
244 writer.finish().map_err(serde::ser::Error::custom)?;
245 buf.serialize(serializer)
246 }
247}
248
249impl<'de> Deserialize<'de> for ParquetRowGroup {
250 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
251 where
252 D: serde::Deserializer<'de>,
253 {
254 let bytes = <Vec<u8>>::deserialize(deserializer)?;
255 let reader = arrow_ipc::reader::StreamReader::try_new(bytes.as_slice(), None)
256 .map_err(serde::de::Error::custom)?;
257
258 let record_batch = reader
259 .expect_element(|| "expected exactly one record batch in IPC stream")
260 .map_err(serde::de::Error::custom)?;
261
262 Ok(ParquetRowGroup { record_batch })
263 }
264}