1use std::fmt;
66use std::sync::Arc;
67
68use aws_sdk_s3::error::DisplayErrorContext;
69use bytes::Bytes;
70use differential_dataflow::Hashable;
71use futures::stream::BoxStream;
72use futures::{StreamExt, TryStreamExt};
73use mz_expr::SafeMfpPlan;
74use mz_ore::cast::CastFrom;
75use mz_persist_client::Diagnostics;
76use mz_persist_client::batch::ProtoBatch;
77use mz_persist_client::cache::PersistClientCache;
78use mz_persist_types::Codec;
79use mz_persist_types::codec_impls::UnitSchema;
80use mz_repr::{DatumVec, GlobalId, Row, RowArena, Timestamp};
81use mz_storage_types::StorageDiff;
82use mz_storage_types::connections::ConnectionContext;
83use mz_storage_types::controller::CollectionMetadata;
84use mz_storage_types::oneshot_sources::{
85 ContentFilter, ContentFormat, ContentSource, OneshotIngestionRequest,
86};
87use mz_storage_types::sources::SourceData;
88use mz_timely_util::builder_async::{
89 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
90};
91use mz_timely_util::pact::Distribute;
92use serde::de::DeserializeOwned;
93use serde::{Deserialize, Serialize};
94use std::collections::{BTreeSet, LinkedList};
95use std::fmt::{Debug, Display};
96use std::future::Future;
97use timely::container::CapacityContainerBuilder;
98use timely::dataflow::channels::pact::Pipeline;
99use timely::dataflow::{Scope, StreamVec};
100use timely::progress::Antichain;
101use tracing::info;
102
103use crate::oneshot_source::aws_source::{AwsS3Source, S3Checksum, S3Object};
104use crate::oneshot_source::csv::{CsvDecoder, CsvRecord, CsvWorkRequest};
105use crate::oneshot_source::http_source::{HttpChecksum, HttpObject, HttpOneshotSource};
106use crate::oneshot_source::parquet::{ParquetFormat, ParquetRowGroup, ParquetWorkRequest};
107
108pub mod csv;
109pub mod parquet;
110
111pub mod aws_source;
112pub mod http_source;
113
114mod util;
115
116pub fn render<'scope, F>(
134 scope: Scope<'scope, Timestamp>,
135 persist_clients: Arc<PersistClientCache>,
136 connection_context: ConnectionContext,
137 collection_id: GlobalId,
138 collection_meta: CollectionMetadata,
139 request: OneshotIngestionRequest,
140 worker_callback: F,
141) -> Vec<PressOnDropButton>
142where
143 F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
144{
145 let OneshotIngestionRequest {
146 source,
147 format,
148 filter,
149 shape,
150 } = request;
151
152 let source = match source {
153 ContentSource::Http { url } => {
154 let source = HttpOneshotSource::new(reqwest::Client::default(), url);
155 SourceKind::Http(source)
156 }
157 ContentSource::AwsS3 {
158 connection,
159 connection_id,
160 uri,
161 } => {
162 let use_checksum = !(connection.endpoint.is_some()
165 && !connection.endpoint.as_ref().unwrap().contains("amazonaws"))
166 || format != ContentFormat::Parquet;
167 if !use_checksum {
168 tracing::info!(
169 "disabling checksum validation for S3 source because endpoint: {:?} is overridden and format is Parquet",
170 connection.endpoint
171 );
172 }
173 let source = AwsS3Source::new(
174 connection,
175 connection_id,
176 connection_context,
177 uri,
178 use_checksum,
179 );
180 SourceKind::AwsS3(source)
181 }
182 };
183 tracing::info!(?source, "created oneshot source");
184
185 let format = match format {
186 ContentFormat::Csv(params) => {
187 let format = CsvDecoder::new(params, &shape.source_desc);
188 FormatKind::Csv(format)
189 }
190 ContentFormat::Parquet => {
191 let format = ParquetFormat::new(shape.source_desc);
192 FormatKind::Parquet(format)
193 }
194 };
195
196 let (objects_stream, discover_token) =
198 render_discover_objects(scope.clone(), collection_id, source.clone(), filter);
199 let (work_stream, split_token) = render_split_work(
201 scope.clone(),
202 collection_id,
203 objects_stream,
204 source.clone(),
205 format.clone(),
206 );
207 let (records_stream, fetch_token) = render_fetch_work(
209 scope.clone(),
210 collection_id,
211 source.clone(),
212 format.clone(),
213 work_stream,
214 );
215 let (rows_stream, decode_token) = render_decode_chunk(
217 scope.clone(),
218 format.clone(),
219 records_stream,
220 shape.source_mfp,
221 );
222 let (batch_stream, batch_token) = render_stage_batches_operator(
224 scope.clone(),
225 collection_id,
226 &collection_meta,
227 persist_clients,
228 rows_stream,
229 );
230
231 render_completion_operator(scope, batch_stream, worker_callback);
233
234 let tokens = vec![
235 discover_token,
236 split_token,
237 fetch_token,
238 decode_token,
239 batch_token,
240 ];
241
242 tokens
243}
244
245pub fn render_discover_objects<'scope, S>(
248 scope: Scope<'scope, Timestamp>,
249 collection_id: GlobalId,
250 source: S,
251 filter: ContentFilter,
252) -> (
253 StreamVec<'scope, Timestamp, Result<(S::Object, S::Checksum), StorageErrorX>>,
254 PressOnDropButton,
255)
256where
257 S: OneshotSource + 'static,
258{
259 let worker_id = scope.index();
261 let num_workers = scope.peers();
262 let active_worker_id = usize::cast_from((collection_id, "discover").hashed()) % num_workers;
263 let is_active_worker = worker_id == active_worker_id;
264
265 let mut builder = AsyncOperatorBuilder::new("CopyFrom-discover".to_string(), scope.clone());
266
267 let (start_handle, start_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
268
269 let shutdown = builder.build(move |caps| async move {
270 let [start_cap] = caps.try_into().unwrap();
271
272 if !is_active_worker {
273 return;
274 }
275
276 let filter = match ObjectFilter::try_new(filter) {
277 Ok(filter) => filter,
278 Err(err) => {
279 tracing::warn!(?err, "failed to create filter");
280 start_handle.give(&start_cap, Err(StorageErrorXKind::generic(err).into()));
281 return;
282 }
283 };
284
285 let work = source.list().await.context("list");
286 match work {
287 Ok(objects) => {
288 let (include, exclude): (Vec<_>, Vec<_>) = objects
289 .into_iter()
290 .partition(|(o, _checksum)| filter.filter::<S>(o));
291 tracing::info!(%worker_id, ?include, ?exclude, "listed objects");
292 if include.is_empty() {
293 let err = StorageErrorXKind::NoMatchingFiles.with_context("discover");
294 start_handle.give(&start_cap, Err(err));
295 return;
296 }
297 include
298 .into_iter()
299 .for_each(|object| start_handle.give(&start_cap, Ok(object)))
300 }
301 Err(err) => {
302 tracing::warn!(?err, "failed to list oneshot source");
303 start_handle.give(&start_cap, Err(err))
304 }
305 }
306 });
307
308 (start_stream, shutdown.press_on_drop())
309}
310
311pub fn render_split_work<'scope, T, S, F>(
314 scope: Scope<'scope, T>,
315 collection_id: GlobalId,
316 objects: StreamVec<'scope, T, Result<(S::Object, S::Checksum), StorageErrorX>>,
317 source: S,
318 format: F,
319) -> (
320 StreamVec<'scope, T, Result<F::WorkRequest<S>, StorageErrorX>>,
321 PressOnDropButton,
322)
323where
324 T: timely::progress::Timestamp,
325 S: OneshotSource + Send + Sync + 'static,
326 F: OneshotFormat + Send + Sync + 'static,
327{
328 let worker_id = scope.index();
329 let mut builder = AsyncOperatorBuilder::new("CopyFrom-split_work".to_string(), scope.clone());
330
331 let (request_handle, request_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
332 let mut objects_handle = builder.new_input_for(objects, Distribute, &request_handle);
333
334 let shutdown = builder.build(move |caps| async move {
335 let [_objects_cap] = caps.try_into().unwrap();
336
337 info!(%collection_id, %worker_id, "CopyFrom Split Work");
338
339 while let Some(event) = objects_handle.next().await {
340 let (capability, maybe_objects) = match event {
341 AsyncEvent::Data(cap, req) => (cap, req),
342 AsyncEvent::Progress(_) => continue,
343 };
344
345 let result = async {
348 let mut requests = Vec::new();
349
350 for maybe_object in maybe_objects {
351 let (object, checksum) = maybe_object?;
353
354 let format_ = format.clone();
355 let source_ = source.clone();
356 let work_requests = mz_ore::task::spawn(|| "split-work", async move {
357 info!(%worker_id, object = %object.name(), "splitting object");
358 format_.split_work(source_.clone(), object, checksum).await
359 })
360 .await?;
361
362 requests.extend(work_requests);
363 }
364
365 Ok::<_, StorageErrorX>(requests)
366 }
367 .await
368 .context("split");
369
370 match result {
371 Ok(requests) => requests
372 .into_iter()
373 .for_each(|req| request_handle.give(&capability, Ok(req))),
374 Err(err) => request_handle.give(&capability, Err(err)),
375 }
376 }
377 });
378
379 (request_stream, shutdown.press_on_drop())
380}
381
382pub fn render_fetch_work<'scope, T, S, F>(
386 scope: Scope<'scope, T>,
387 collection_id: GlobalId,
388 source: S,
389 format: F,
390 work_requests: StreamVec<'scope, T, Result<F::WorkRequest<S>, StorageErrorX>>,
391) -> (
392 StreamVec<'scope, T, Result<F::RecordChunk, StorageErrorX>>,
393 PressOnDropButton,
394)
395where
396 T: timely::progress::Timestamp,
397 S: OneshotSource + Sync + 'static,
398 F: OneshotFormat + Sync + 'static,
399{
400 let worker_id = scope.index();
401 let mut builder = AsyncOperatorBuilder::new("CopyFrom-fetch_work".to_string(), scope.clone());
402
403 let (record_handle, record_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
404 let mut work_requests_handle = builder.new_input_for(work_requests, Distribute, &record_handle);
405
406 let shutdown = builder.build(move |caps| async move {
407 let [_work_cap] = caps.try_into().unwrap();
408
409 info!(%collection_id, %worker_id, "CopyFrom Fetch Work");
410
411 while let Some(event) = work_requests_handle.next().await {
412 let (capability, maybe_requests) = match event {
413 AsyncEvent::Data(cap, req) => (cap, req),
414 AsyncEvent::Progress(_) => continue,
415 };
416
417 let result = async {
419 for maybe_request in maybe_requests {
421 let request = maybe_request?;
422
423 let mut work_stream = format.fetch_work(&source, request);
424 while let Some(result) = work_stream.next().await {
425 let record_chunk = result.context("fetch worker")?;
427
428 record_handle.give(&capability, Ok(record_chunk));
433 }
434 }
435
436 Ok::<_, StorageErrorX>(())
437 }
438 .await
439 .context("fetch work");
440
441 if let Err(err) = result {
442 tracing::warn!(?err, "failed to fetch");
443 record_handle.give(&capability, Err(err))
444 }
445 }
446 });
447
448 (record_stream, shutdown.press_on_drop())
449}
450
451pub fn render_decode_chunk<'scope, T, F>(
454 scope: Scope<'scope, T>,
455 format: F,
456 record_chunks: StreamVec<'scope, T, Result<F::RecordChunk, StorageErrorX>>,
457 mfp: SafeMfpPlan,
458) -> (
459 StreamVec<'scope, T, Result<Row, StorageErrorX>>,
460 PressOnDropButton,
461)
462where
463 T: timely::progress::Timestamp,
464 F: OneshotFormat + 'static,
465{
466 let mut builder = AsyncOperatorBuilder::new("CopyFrom-decode_chunk".to_string(), scope.clone());
467
468 let (row_handle, row_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
469 let mut record_chunk_handle = builder.new_input_for(record_chunks, Distribute, &row_handle);
470
471 let shutdown = builder.build(move |caps| async move {
472 let [_row_cap] = caps.try_into().unwrap();
473
474 let mut datum_vec = DatumVec::default();
475 let row_arena = RowArena::default();
476 let mut row_buf = Row::default();
477
478 while let Some(event) = record_chunk_handle.next().await {
479 let (capability, maybe_chunks) = match event {
480 AsyncEvent::Data(cap, data) => (cap, data),
481 AsyncEvent::Progress(_) => continue,
482 };
483
484 let result = async {
485 let mut rows = Vec::new();
486 for maybe_chunk in maybe_chunks {
487 let chunk = maybe_chunk?;
488 format.decode_chunk(chunk, &mut rows)?;
489 }
490 Ok::<_, StorageErrorX>(rows)
491 }
492 .await
493 .context("decode chunk");
494
495 match result {
496 Ok(rows) => {
497 for row in rows {
500 let mut datums = datum_vec.borrow_with(&row);
501 let result = mfp
502 .evaluate_into(&mut *datums, &row_arena, &mut row_buf)
503 .map(|row| row.cloned());
504
505 match result {
506 Ok(Some(row)) => row_handle.give(&capability, Ok(row)),
507 Ok(None) => {
508 mz_ore::soft_panic_or_log!("oneshot source MFP filtered out data!");
511 }
512 Err(e) => {
513 let err = StorageErrorXKind::MfpEvalError(e.to_string().into())
514 .with_context("decode");
515 row_handle.give(&capability, Err(err))
516 }
517 }
518 }
519 }
520 Err(err) => row_handle.give(&capability, Err(err)),
521 }
522 }
523 });
524
525 (row_stream, shutdown.press_on_drop())
526}
527
528pub fn render_stage_batches_operator<'scope, T>(
531 scope: Scope<'scope, T>,
532 collection_id: GlobalId,
533 collection_meta: &CollectionMetadata,
534 persist_clients: Arc<PersistClientCache>,
535 rows_stream: StreamVec<'scope, T, Result<Row, StorageErrorX>>,
536) -> (
537 StreamVec<'scope, T, Result<ProtoBatch, StorageErrorX>>,
538 PressOnDropButton,
539)
540where
541 T: timely::progress::Timestamp,
542{
543 let persist_location = collection_meta.persist_location.clone();
544 let shard_id = collection_meta.data_shard;
545 let collection_desc = Arc::new(collection_meta.relation_desc.clone());
546
547 let mut builder =
548 AsyncOperatorBuilder::new("CopyFrom-stage_batches".to_string(), scope.clone());
549
550 let (proto_batch_handle, proto_batch_stream) =
551 builder.new_output::<CapacityContainerBuilder<_>>();
552 let mut rows_handle = builder.new_input_for(rows_stream, Pipeline, &proto_batch_handle);
553
554 let shutdown = builder.build(move |caps| async move {
555 let [proto_batch_cap] = caps.try_into().unwrap();
556
557 let persist_client = persist_clients
559 .open(persist_location)
560 .await
561 .expect("failed to open Persist client");
562 let persist_diagnostics = Diagnostics {
563 shard_name: collection_id.to_string(),
564 handle_purpose: "CopyFrom::stage_batches".to_string(),
565 };
566 let write_handle = persist_client
567 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
568 shard_id,
569 Arc::clone(&collection_desc),
570 Arc::new(UnitSchema),
571 persist_diagnostics,
572 )
573 .await
574 .expect("could not open Persist shard");
575
576 let lower = mz_repr::Timestamp::MIN;
580 let upper = Antichain::from_elem(lower.step_forward());
581
582 let mut batch_builder = write_handle.builder(Antichain::from_elem(lower));
583
584 while let Some(event) = rows_handle.next().await {
585 let AsyncEvent::Data(_, row_batch) = event else {
586 continue;
587 };
588
589 for maybe_row in row_batch {
591 let maybe_row = maybe_row.and_then(|row| {
592 Row::validate(&row, &*collection_desc).map_err(|e| {
593 StorageErrorXKind::invalid_record_batch(e).with_context("stage_batches")
594 })?;
595 Ok(row)
596 });
597 match maybe_row {
598 Ok(row) => {
600 let data = SourceData(Ok(row));
601 batch_builder
602 .add(&data, &(), &lower, &1)
603 .await
604 .expect("failed to add Row to batch");
605 }
606 Err(err) => {
608 let batch = batch_builder
610 .finish(upper)
611 .await
612 .expect("failed to cleanup batch");
613 batch.delete().await;
614
615 proto_batch_handle
617 .give(&proto_batch_cap, Err(err).context("stage batches"));
618 return;
619 }
620 }
621 }
622 }
623
624 let batch = batch_builder
625 .finish(upper)
626 .await
627 .expect("failed to create Batch");
628
629 let proto_batch = batch.into_transmittable_batch();
638 proto_batch_handle.give(&proto_batch_cap, Ok(proto_batch));
639 });
640
641 (proto_batch_stream, shutdown.press_on_drop())
642}
643
644pub fn render_completion_operator<'scope, T, F>(
647 scope: Scope<'scope, T>,
648 results_stream: StreamVec<'scope, T, Result<ProtoBatch, StorageErrorX>>,
649 worker_callback: F,
650) where
651 T: timely::progress::Timestamp,
652 F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
653{
654 let mut builder = AsyncOperatorBuilder::new("CopyFrom-completion".to_string(), scope.clone());
655 let mut results_input = builder.new_disconnected_input(results_stream, Pipeline);
656
657 builder.build(move |_| async move {
658 let result = async move {
659 let mut maybe_payload: Option<ProtoBatch> = None;
660
661 while let Some(event) = results_input.next().await {
662 if let AsyncEvent::Data(_cap, results) = event {
663 let [result] = results
664 .try_into()
665 .expect("only 1 event on the result stream");
666
667 if maybe_payload.is_some() {
669 panic!("expected only one batch!");
670 }
671
672 maybe_payload = Some(result.map_err(|e| e.to_string())?);
673 }
674 }
675
676 Ok(maybe_payload)
677 }
678 .await;
679
680 worker_callback(result);
682 });
683}
684
685pub trait OneshotObject {
687 fn name(&self) -> &str;
689
690 fn path(&self) -> &str;
692
693 fn size(&self) -> usize;
695
696 fn encodings(&self) -> &[Encoding];
702}
703
704#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
706pub enum Encoding {
707 Bzip2,
708 Gzip,
709 Xz,
710 Zstd,
711}
712
713pub trait OneshotSource: Clone + Send + Unpin {
715 type Object: OneshotObject
717 + Debug
718 + Clone
719 + Send
720 + Unpin
721 + Serialize
722 + DeserializeOwned
723 + 'static;
724 type Checksum: Debug + Clone + Send + Unpin + Serialize + DeserializeOwned + 'static;
726
727 fn list<'a>(
729 &'a self,
730 ) -> impl Future<Output = Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX>> + Send;
731
732 fn get<'s>(
734 &'s self,
735 object: Self::Object,
736 checksum: Self::Checksum,
737 range: Option<std::ops::RangeInclusive<usize>>,
738 ) -> BoxStream<'s, Result<Bytes, StorageErrorX>>;
739}
740
741#[derive(Clone, Debug)]
748pub(crate) enum SourceKind {
749 Http(HttpOneshotSource),
750 AwsS3(AwsS3Source),
751}
752
753impl OneshotSource for SourceKind {
754 type Object = ObjectKind;
755 type Checksum = ChecksumKind;
756
757 async fn list<'a>(&'a self) -> Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX> {
758 match self {
759 SourceKind::Http(http) => {
760 let objects = http.list().await.context("http")?;
761 let objects = objects
762 .into_iter()
763 .map(|(object, checksum)| {
764 (ObjectKind::Http(object), ChecksumKind::Http(checksum))
765 })
766 .collect();
767 Ok(objects)
768 }
769 SourceKind::AwsS3(s3) => {
770 let objects = s3.list().await.context("s3")?;
771 let objects = objects
772 .into_iter()
773 .map(|(object, checksum)| {
774 (ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum))
775 })
776 .collect();
777 Ok(objects)
778 }
779 }
780 }
781
782 fn get<'s>(
783 &'s self,
784 object: Self::Object,
785 checksum: Self::Checksum,
786 range: Option<std::ops::RangeInclusive<usize>>,
787 ) -> BoxStream<'s, Result<Bytes, StorageErrorX>> {
788 match (self, object, checksum) {
789 (SourceKind::Http(http), ObjectKind::Http(object), ChecksumKind::Http(checksum)) => {
790 http.get(object, checksum, range)
791 .map(|result| result.context("http"))
792 .boxed()
793 }
794 (SourceKind::AwsS3(s3), ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum)) => s3
795 .get(object, checksum, range)
796 .map(|result| result.context("aws_s3"))
797 .boxed(),
798 (SourceKind::AwsS3(_) | SourceKind::Http(_), _, _) => {
799 unreachable!("programming error! wrong source, object, and checksum kind");
800 }
801 }
802 }
803}
804
805#[derive(Debug, Clone, Serialize, Deserialize)]
807pub(crate) enum ObjectKind {
808 Http(HttpObject),
809 AwsS3(S3Object),
810}
811
812impl OneshotObject for ObjectKind {
813 fn name(&self) -> &str {
814 match self {
815 ObjectKind::Http(object) => object.name(),
816 ObjectKind::AwsS3(object) => object.name(),
817 }
818 }
819
820 fn path(&self) -> &str {
821 match self {
822 ObjectKind::Http(object) => object.path(),
823 ObjectKind::AwsS3(object) => object.path(),
824 }
825 }
826
827 fn size(&self) -> usize {
828 match self {
829 ObjectKind::Http(object) => object.size(),
830 ObjectKind::AwsS3(object) => object.size(),
831 }
832 }
833
834 fn encodings(&self) -> &[Encoding] {
835 match self {
836 ObjectKind::Http(object) => object.encodings(),
837 ObjectKind::AwsS3(object) => object.encodings(),
838 }
839 }
840}
841
842#[derive(Debug, Clone, Serialize, Deserialize)]
844pub(crate) enum ChecksumKind {
845 Http(HttpChecksum),
846 AwsS3(S3Checksum),
847}
848
849pub trait OneshotFormat: Clone {
851 type WorkRequest<S>: Debug + Clone + Send + Serialize + DeserializeOwned + 'static
853 where
854 S: OneshotSource;
855 type RecordChunk: Debug + Clone + Send + Serialize + DeserializeOwned + 'static;
857
858 fn split_work<S: OneshotSource + Send>(
863 &self,
864 source: S,
865 object: S::Object,
866 checksum: S::Checksum,
867 ) -> impl Future<Output = Result<Vec<Self::WorkRequest<S>>, StorageErrorX>> + Send;
868
869 fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
872 &'a self,
873 source: &'a S,
874 request: Self::WorkRequest<S>,
875 ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>>;
876
877 fn decode_chunk(
879 &self,
880 chunk: Self::RecordChunk,
881 rows: &mut Vec<Row>,
882 ) -> Result<usize, StorageErrorX>;
883}
884
885#[derive(Clone, Debug)]
892pub(crate) enum FormatKind {
893 Csv(CsvDecoder),
894 Parquet(ParquetFormat),
895}
896
897impl OneshotFormat for FormatKind {
898 type WorkRequest<S>
899 = RequestKind<S::Object, S::Checksum>
900 where
901 S: OneshotSource;
902 type RecordChunk = RecordChunkKind;
903
904 async fn split_work<S: OneshotSource + Send>(
905 &self,
906 source: S,
907 object: S::Object,
908 checksum: S::Checksum,
909 ) -> Result<Vec<Self::WorkRequest<S>>, StorageErrorX> {
910 match self {
911 FormatKind::Csv(csv) => {
912 let work = csv
913 .split_work(source, object, checksum)
914 .await
915 .context("csv")?
916 .into_iter()
917 .map(RequestKind::Csv)
918 .collect();
919 Ok(work)
920 }
921 FormatKind::Parquet(parquet) => {
922 let work = parquet
923 .split_work(source, object, checksum)
924 .await
925 .context("parquet")?
926 .into_iter()
927 .map(RequestKind::Parquet)
928 .collect();
929 Ok(work)
930 }
931 }
932 }
933
934 fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
935 &'a self,
936 source: &'a S,
937 request: Self::WorkRequest<S>,
938 ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>> {
939 match (self, request) {
940 (FormatKind::Csv(csv), RequestKind::Csv(request)) => csv
941 .fetch_work(source, request)
942 .map_ok(RecordChunkKind::Csv)
943 .map(|result| result.context("csv"))
944 .boxed(),
945 (FormatKind::Parquet(parquet), RequestKind::Parquet(request)) => parquet
946 .fetch_work(source, request)
947 .map_ok(RecordChunkKind::Parquet)
948 .map(|result| result.context("parquet"))
949 .boxed(),
950 (FormatKind::Parquet(_), RequestKind::Csv(_))
951 | (FormatKind::Csv(_), RequestKind::Parquet(_)) => {
952 unreachable!("programming error, {self:?}")
953 }
954 }
955 }
956
957 fn decode_chunk(
958 &self,
959 chunk: Self::RecordChunk,
960 rows: &mut Vec<Row>,
961 ) -> Result<usize, StorageErrorX> {
962 match (self, chunk) {
963 (FormatKind::Csv(csv), RecordChunkKind::Csv(chunk)) => {
964 csv.decode_chunk(chunk, rows).context("csv")
965 }
966 (FormatKind::Parquet(parquet), RecordChunkKind::Parquet(chunk)) => {
967 parquet.decode_chunk(chunk, rows).context("parquet")
968 }
969 (FormatKind::Parquet(_), RecordChunkKind::Csv(_))
970 | (FormatKind::Csv(_), RecordChunkKind::Parquet(_)) => {
971 unreachable!("programming error, {self:?}")
972 }
973 }
974 }
975}
976
977#[derive(Clone, Debug, Serialize, Deserialize)]
978pub(crate) enum RequestKind<O, C> {
979 Csv(CsvWorkRequest<O, C>),
980 Parquet(ParquetWorkRequest<O, C>),
981}
982
983#[derive(Clone, Debug, Serialize, Deserialize)]
984pub(crate) enum RecordChunkKind {
985 Csv(CsvRecord),
986 Parquet(ParquetRowGroup),
987}
988
989pub(crate) enum ObjectFilter {
990 None,
991 Files(BTreeSet<Box<str>>),
992 Pattern(glob::Pattern),
993}
994
995impl ObjectFilter {
996 pub fn try_new(filter: ContentFilter) -> Result<Self, anyhow::Error> {
997 match filter {
998 ContentFilter::None => Ok(ObjectFilter::None),
999 ContentFilter::Files(files) => {
1000 let files = files.into_iter().map(|f| f.into()).collect();
1001 Ok(ObjectFilter::Files(files))
1002 }
1003 ContentFilter::Pattern(pattern) => {
1004 let pattern = glob::Pattern::new(&pattern)?;
1005 Ok(ObjectFilter::Pattern(pattern))
1006 }
1007 }
1008 }
1009
1010 pub fn filter<S: OneshotSource>(&self, object: &S::Object) -> bool {
1012 match self {
1013 ObjectFilter::None => true,
1014 ObjectFilter::Files(files) => files.contains(object.path()),
1015 ObjectFilter::Pattern(pattern) => pattern.matches(object.path()),
1016 }
1017 }
1018}
1019
1020#[derive(Debug, Clone, Deserialize, Serialize)]
1027pub struct StorageErrorX {
1028 kind: StorageErrorXKind,
1029 context: LinkedList<String>,
1030}
1031
1032impl fmt::Display for StorageErrorX {
1033 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1034 writeln!(f, "error: {}", self.kind)?;
1035 writeln!(f, "causes: {:?}", self.context)?;
1036 Ok(())
1037 }
1038}
1039
1040#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
1042pub enum StorageErrorXKind {
1043 #[error("csv decoding error: {0}")]
1044 CsvDecoding(Arc<str>),
1045 #[error("parquet error: {0}")]
1046 ParquetError(Arc<str>),
1047 #[error("reqwest error: {0}")]
1048 Reqwest(Arc<str>),
1049 #[error("aws s3 request error: {0}")]
1050 AwsS3Request(String),
1051 #[error("aws s3 bytestream error: {0}")]
1052 AwsS3Bytes(Arc<str>),
1053 #[error("invalid reqwest header: {0}")]
1054 InvalidHeader(Arc<str>),
1055 #[error("failed to decode Row from a record batch: {0}")]
1056 InvalidRecordBatch(Arc<str>),
1057 #[error("programming error: {0}")]
1058 ProgrammingError(Arc<str>),
1059 #[error("failed to get the size of an object")]
1060 MissingSize,
1061 #[error("object is missing the required '{0}' field")]
1062 MissingField(Arc<str>),
1063 #[error("failed while evaluating the provided mfp: '{0}'")]
1064 MfpEvalError(Arc<str>),
1065 #[error("no matching files found at the given url")]
1066 NoMatchingFiles,
1067 #[error("something went wrong: {0}")]
1068 Generic(String),
1069}
1070
1071impl From<csv_async::Error> for StorageErrorXKind {
1072 fn from(err: csv_async::Error) -> Self {
1073 StorageErrorXKind::CsvDecoding(err.to_string().into())
1074 }
1075}
1076
1077impl From<reqwest::Error> for StorageErrorXKind {
1078 fn from(err: reqwest::Error) -> Self {
1079 StorageErrorXKind::Reqwest(err.to_string().into())
1080 }
1081}
1082
1083impl From<reqwest::header::ToStrError> for StorageErrorXKind {
1084 fn from(err: reqwest::header::ToStrError) -> Self {
1085 StorageErrorXKind::InvalidHeader(err.to_string().into())
1086 }
1087}
1088
1089impl From<aws_smithy_types::byte_stream::error::Error> for StorageErrorXKind {
1090 fn from(err: aws_smithy_types::byte_stream::error::Error) -> Self {
1091 StorageErrorXKind::AwsS3Bytes(DisplayErrorContext(err).to_string().into())
1092 }
1093}
1094
1095impl From<::parquet::errors::ParquetError> for StorageErrorXKind {
1096 fn from(err: ::parquet::errors::ParquetError) -> Self {
1097 StorageErrorXKind::ParquetError(err.to_string().into())
1098 }
1099}
1100
1101impl StorageErrorXKind {
1102 pub fn with_context<C: Display>(self, context: C) -> StorageErrorX {
1103 StorageErrorX {
1104 kind: self,
1105 context: LinkedList::from([context.to_string()]),
1106 }
1107 }
1108
1109 pub fn invalid_record_batch<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1110 StorageErrorXKind::InvalidRecordBatch(error.into())
1111 }
1112
1113 pub fn generic<C: Display>(error: C) -> StorageErrorXKind {
1114 StorageErrorXKind::Generic(error.to_string())
1115 }
1116
1117 pub fn programming_error<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1118 StorageErrorXKind::ProgrammingError(error.into())
1119 }
1120}
1121
1122impl<E> From<E> for StorageErrorX
1123where
1124 E: Into<StorageErrorXKind>,
1125{
1126 fn from(err: E) -> Self {
1127 StorageErrorX {
1128 kind: err.into(),
1129 context: LinkedList::new(),
1130 }
1131 }
1132}
1133
1134trait StorageErrorXContext<T> {
1135 fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1136 where
1137 C: Display;
1138}
1139
1140impl<T, E> StorageErrorXContext<T> for Result<T, E>
1141where
1142 E: Into<StorageErrorXKind>,
1143{
1144 fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1145 where
1146 C: Display,
1147 {
1148 match self {
1149 Ok(val) => Ok(val),
1150 Err(kind) => Err(StorageErrorX {
1151 kind: kind.into(),
1152 context: LinkedList::from([context.to_string()]),
1153 }),
1154 }
1155 }
1156}
1157
1158impl<T> StorageErrorXContext<T> for Result<T, StorageErrorX> {
1159 fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1160 where
1161 C: Display,
1162 {
1163 match self {
1164 Ok(val) => Ok(val),
1165 Err(mut e) => {
1166 e.context.push_back(context.to_string());
1167 Err(e)
1168 }
1169 }
1170 }
1171}