1use std::fmt;
66use std::sync::Arc;
67
68use aws_sdk_s3::error::DisplayErrorContext;
69use bytes::Bytes;
70use differential_dataflow::Hashable;
71use futures::stream::BoxStream;
72use futures::{StreamExt, TryStreamExt};
73use mz_expr::SafeMfpPlan;
74use mz_ore::cast::CastFrom;
75use mz_persist_client::Diagnostics;
76use mz_persist_client::batch::ProtoBatch;
77use mz_persist_client::cache::PersistClientCache;
78use mz_persist_types::Codec;
79use mz_persist_types::codec_impls::UnitSchema;
80use mz_repr::{DatumVec, GlobalId, Row, RowArena, Timestamp};
81use mz_storage_types::StorageDiff;
82use mz_storage_types::connections::ConnectionContext;
83use mz_storage_types::controller::CollectionMetadata;
84use mz_storage_types::oneshot_sources::{
85 ContentFilter, ContentFormat, ContentSource, OneshotIngestionRequest,
86};
87use mz_storage_types::sources::SourceData;
88use mz_timely_util::builder_async::{
89 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
90};
91use mz_timely_util::pact::Distribute;
92use serde::de::DeserializeOwned;
93use serde::{Deserialize, Serialize};
94use std::collections::{BTreeSet, LinkedList};
95use std::fmt::{Debug, Display};
96use std::future::Future;
97use timely::container::CapacityContainerBuilder;
98use timely::dataflow::channels::pact::Pipeline;
99use timely::dataflow::{Scope, StreamVec};
100use timely::progress::Antichain;
101use tracing::info;
102
103use crate::oneshot_source::aws_source::{AwsS3Source, S3Checksum, S3Object};
104use crate::oneshot_source::csv::{CsvDecoder, CsvRecord, CsvWorkRequest};
105use crate::oneshot_source::http_source::{HttpChecksum, HttpObject, HttpOneshotSource};
106use crate::oneshot_source::parquet::{ParquetFormat, ParquetRowGroup, ParquetWorkRequest};
107
108pub mod csv;
109pub mod parquet;
110
111pub mod aws_source;
112pub mod http_source;
113
114mod util;
115
116pub fn render<G, F>(
134 scope: G,
135 persist_clients: Arc<PersistClientCache>,
136 connection_context: ConnectionContext,
137 collection_id: GlobalId,
138 collection_meta: CollectionMetadata,
139 request: OneshotIngestionRequest,
140 worker_callback: F,
141) -> Vec<PressOnDropButton>
142where
143 G: Scope<Timestamp = Timestamp>,
144 F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
145{
146 let OneshotIngestionRequest {
147 source,
148 format,
149 filter,
150 shape,
151 } = request;
152
153 let source = match source {
154 ContentSource::Http { url } => {
155 let source = HttpOneshotSource::new(reqwest::Client::default(), url);
156 SourceKind::Http(source)
157 }
158 ContentSource::AwsS3 {
159 connection,
160 connection_id,
161 uri,
162 } => {
163 let use_checksum = !(connection.endpoint.is_some()
166 && !connection.endpoint.as_ref().unwrap().contains("amazonaws"))
167 || format != ContentFormat::Parquet;
168 if !use_checksum {
169 tracing::info!(
170 "disabling checksum validation for S3 source because endpoint: {:?} is overridden and format is Parquet",
171 connection.endpoint
172 );
173 }
174 let source = AwsS3Source::new(
175 connection,
176 connection_id,
177 connection_context,
178 uri,
179 use_checksum,
180 );
181 SourceKind::AwsS3(source)
182 }
183 };
184 tracing::info!(?source, "created oneshot source");
185
186 let format = match format {
187 ContentFormat::Csv(params) => {
188 let format = CsvDecoder::new(params, &shape.source_desc);
189 FormatKind::Csv(format)
190 }
191 ContentFormat::Parquet => {
192 let format = ParquetFormat::new(shape.source_desc);
193 FormatKind::Parquet(format)
194 }
195 };
196
197 let (objects_stream, discover_token) =
199 render_discover_objects(scope.clone(), collection_id, source.clone(), filter);
200 let (work_stream, split_token) = render_split_work(
202 scope.clone(),
203 collection_id,
204 objects_stream,
205 source.clone(),
206 format.clone(),
207 );
208 let (records_stream, fetch_token) = render_fetch_work(
210 scope.clone(),
211 collection_id,
212 source.clone(),
213 format.clone(),
214 work_stream,
215 );
216 let (rows_stream, decode_token) = render_decode_chunk(
218 scope.clone(),
219 format.clone(),
220 records_stream,
221 shape.source_mfp,
222 );
223 let (batch_stream, batch_token) = render_stage_batches_operator(
225 scope.clone(),
226 collection_id,
227 &collection_meta,
228 persist_clients,
229 rows_stream,
230 );
231
232 render_completion_operator(scope, batch_stream, worker_callback);
234
235 let tokens = vec![
236 discover_token,
237 split_token,
238 fetch_token,
239 decode_token,
240 batch_token,
241 ];
242
243 tokens
244}
245
246pub fn render_discover_objects<G, S>(
249 scope: G,
250 collection_id: GlobalId,
251 source: S,
252 filter: ContentFilter,
253) -> (
254 StreamVec<G, Result<(S::Object, S::Checksum), StorageErrorX>>,
255 PressOnDropButton,
256)
257where
258 G: Scope<Timestamp = Timestamp>,
259 S: OneshotSource + 'static,
260{
261 let worker_id = scope.index();
263 let num_workers = scope.peers();
264 let active_worker_id = usize::cast_from((collection_id, "discover").hashed()) % num_workers;
265 let is_active_worker = worker_id == active_worker_id;
266
267 let mut builder = AsyncOperatorBuilder::new("CopyFrom-discover".to_string(), scope.clone());
268
269 let (start_handle, start_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
270
271 let shutdown = builder.build(move |caps| async move {
272 let [start_cap] = caps.try_into().unwrap();
273
274 if !is_active_worker {
275 return;
276 }
277
278 let filter = match ObjectFilter::try_new(filter) {
279 Ok(filter) => filter,
280 Err(err) => {
281 tracing::warn!(?err, "failed to create filter");
282 start_handle.give(&start_cap, Err(StorageErrorXKind::generic(err).into()));
283 return;
284 }
285 };
286
287 let work = source.list().await.context("list");
288 match work {
289 Ok(objects) => {
290 let (include, exclude): (Vec<_>, Vec<_>) = objects
291 .into_iter()
292 .partition(|(o, _checksum)| filter.filter::<S>(o));
293 tracing::info!(%worker_id, ?include, ?exclude, "listed objects");
294 if include.is_empty() {
295 let err = StorageErrorXKind::NoMatchingFiles.with_context("discover");
296 start_handle.give(&start_cap, Err(err));
297 return;
298 }
299 include
300 .into_iter()
301 .for_each(|object| start_handle.give(&start_cap, Ok(object)))
302 }
303 Err(err) => {
304 tracing::warn!(?err, "failed to list oneshot source");
305 start_handle.give(&start_cap, Err(err))
306 }
307 }
308 });
309
310 (start_stream, shutdown.press_on_drop())
311}
312
313pub fn render_split_work<G, S, F>(
316 scope: G,
317 collection_id: GlobalId,
318 objects: StreamVec<G, Result<(S::Object, S::Checksum), StorageErrorX>>,
319 source: S,
320 format: F,
321) -> (
322 StreamVec<G, Result<F::WorkRequest<S>, StorageErrorX>>,
323 PressOnDropButton,
324)
325where
326 G: Scope,
327 S: OneshotSource + Send + Sync + 'static,
328 F: OneshotFormat + Send + Sync + 'static,
329{
330 let worker_id = scope.index();
331 let mut builder = AsyncOperatorBuilder::new("CopyFrom-split_work".to_string(), scope.clone());
332
333 let (request_handle, request_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
334 let mut objects_handle = builder.new_input_for(objects, Distribute, &request_handle);
335
336 let shutdown = builder.build(move |caps| async move {
337 let [_objects_cap] = caps.try_into().unwrap();
338
339 info!(%collection_id, %worker_id, "CopyFrom Split Work");
340
341 while let Some(event) = objects_handle.next().await {
342 let (capability, maybe_objects) = match event {
343 AsyncEvent::Data(cap, req) => (cap, req),
344 AsyncEvent::Progress(_) => continue,
345 };
346
347 let result = async {
350 let mut requests = Vec::new();
351
352 for maybe_object in maybe_objects {
353 let (object, checksum) = maybe_object?;
355
356 let format_ = format.clone();
357 let source_ = source.clone();
358 let work_requests = mz_ore::task::spawn(|| "split-work", async move {
359 info!(%worker_id, object = %object.name(), "splitting object");
360 format_.split_work(source_.clone(), object, checksum).await
361 })
362 .await?;
363
364 requests.extend(work_requests);
365 }
366
367 Ok::<_, StorageErrorX>(requests)
368 }
369 .await
370 .context("split");
371
372 match result {
373 Ok(requests) => requests
374 .into_iter()
375 .for_each(|req| request_handle.give(&capability, Ok(req))),
376 Err(err) => request_handle.give(&capability, Err(err)),
377 }
378 }
379 });
380
381 (request_stream, shutdown.press_on_drop())
382}
383
384pub fn render_fetch_work<G, S, F>(
388 scope: G,
389 collection_id: GlobalId,
390 source: S,
391 format: F,
392 work_requests: StreamVec<G, Result<F::WorkRequest<S>, StorageErrorX>>,
393) -> (
394 StreamVec<G, Result<F::RecordChunk, StorageErrorX>>,
395 PressOnDropButton,
396)
397where
398 G: Scope,
399 S: OneshotSource + Sync + 'static,
400 F: OneshotFormat + Sync + 'static,
401{
402 let worker_id = scope.index();
403 let mut builder = AsyncOperatorBuilder::new("CopyFrom-fetch_work".to_string(), scope.clone());
404
405 let (record_handle, record_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
406 let mut work_requests_handle = builder.new_input_for(work_requests, Distribute, &record_handle);
407
408 let shutdown = builder.build(move |caps| async move {
409 let [_work_cap] = caps.try_into().unwrap();
410
411 info!(%collection_id, %worker_id, "CopyFrom Fetch Work");
412
413 while let Some(event) = work_requests_handle.next().await {
414 let (capability, maybe_requests) = match event {
415 AsyncEvent::Data(cap, req) => (cap, req),
416 AsyncEvent::Progress(_) => continue,
417 };
418
419 let result = async {
421 for maybe_request in maybe_requests {
423 let request = maybe_request?;
424
425 let mut work_stream = format.fetch_work(&source, request);
426 while let Some(result) = work_stream.next().await {
427 let record_chunk = result.context("fetch worker")?;
429
430 record_handle.give(&capability, Ok(record_chunk));
435 }
436 }
437
438 Ok::<_, StorageErrorX>(())
439 }
440 .await
441 .context("fetch work");
442
443 if let Err(err) = result {
444 tracing::warn!(?err, "failed to fetch");
445 record_handle.give(&capability, Err(err))
446 }
447 }
448 });
449
450 (record_stream, shutdown.press_on_drop())
451}
452
453pub fn render_decode_chunk<G, F>(
456 scope: G,
457 format: F,
458 record_chunks: StreamVec<G, Result<F::RecordChunk, StorageErrorX>>,
459 mfp: SafeMfpPlan,
460) -> (StreamVec<G, Result<Row, StorageErrorX>>, PressOnDropButton)
461where
462 G: Scope,
463 F: OneshotFormat + 'static,
464{
465 let mut builder = AsyncOperatorBuilder::new("CopyFrom-decode_chunk".to_string(), scope.clone());
466
467 let (row_handle, row_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
468 let mut record_chunk_handle = builder.new_input_for(record_chunks, Distribute, &row_handle);
469
470 let shutdown = builder.build(move |caps| async move {
471 let [_row_cap] = caps.try_into().unwrap();
472
473 let mut datum_vec = DatumVec::default();
474 let row_arena = RowArena::default();
475 let mut row_buf = Row::default();
476
477 while let Some(event) = record_chunk_handle.next().await {
478 let (capability, maybe_chunks) = match event {
479 AsyncEvent::Data(cap, data) => (cap, data),
480 AsyncEvent::Progress(_) => continue,
481 };
482
483 let result = async {
484 let mut rows = Vec::new();
485 for maybe_chunk in maybe_chunks {
486 let chunk = maybe_chunk?;
487 format.decode_chunk(chunk, &mut rows)?;
488 }
489 Ok::<_, StorageErrorX>(rows)
490 }
491 .await
492 .context("decode chunk");
493
494 match result {
495 Ok(rows) => {
496 for row in rows {
499 let mut datums = datum_vec.borrow_with(&row);
500 let result = mfp
501 .evaluate_into(&mut *datums, &row_arena, &mut row_buf)
502 .map(|row| row.cloned());
503
504 match result {
505 Ok(Some(row)) => row_handle.give(&capability, Ok(row)),
506 Ok(None) => {
507 mz_ore::soft_panic_or_log!("oneshot source MFP filtered out data!");
510 }
511 Err(e) => {
512 let err = StorageErrorXKind::MfpEvalError(e.to_string().into())
513 .with_context("decode");
514 row_handle.give(&capability, Err(err))
515 }
516 }
517 }
518 }
519 Err(err) => row_handle.give(&capability, Err(err)),
520 }
521 }
522 });
523
524 (row_stream, shutdown.press_on_drop())
525}
526
527pub fn render_stage_batches_operator<G>(
530 scope: G,
531 collection_id: GlobalId,
532 collection_meta: &CollectionMetadata,
533 persist_clients: Arc<PersistClientCache>,
534 rows_stream: StreamVec<G, Result<Row, StorageErrorX>>,
535) -> (
536 StreamVec<G, Result<ProtoBatch, StorageErrorX>>,
537 PressOnDropButton,
538)
539where
540 G: Scope,
541{
542 let persist_location = collection_meta.persist_location.clone();
543 let shard_id = collection_meta.data_shard;
544 let collection_desc = Arc::new(collection_meta.relation_desc.clone());
545
546 let mut builder =
547 AsyncOperatorBuilder::new("CopyFrom-stage_batches".to_string(), scope.clone());
548
549 let (proto_batch_handle, proto_batch_stream) =
550 builder.new_output::<CapacityContainerBuilder<_>>();
551 let mut rows_handle = builder.new_input_for(rows_stream, Pipeline, &proto_batch_handle);
552
553 let shutdown = builder.build(move |caps| async move {
554 let [proto_batch_cap] = caps.try_into().unwrap();
555
556 let persist_client = persist_clients
558 .open(persist_location)
559 .await
560 .expect("failed to open Persist client");
561 let persist_diagnostics = Diagnostics {
562 shard_name: collection_id.to_string(),
563 handle_purpose: "CopyFrom::stage_batches".to_string(),
564 };
565 let write_handle = persist_client
566 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
567 shard_id,
568 Arc::clone(&collection_desc),
569 Arc::new(UnitSchema),
570 persist_diagnostics,
571 )
572 .await
573 .expect("could not open Persist shard");
574
575 let lower = mz_repr::Timestamp::MIN;
579 let upper = Antichain::from_elem(lower.step_forward());
580
581 let mut batch_builder = write_handle.builder(Antichain::from_elem(lower));
582
583 while let Some(event) = rows_handle.next().await {
584 let AsyncEvent::Data(_, row_batch) = event else {
585 continue;
586 };
587
588 for maybe_row in row_batch {
590 let maybe_row = maybe_row.and_then(|row| {
591 Row::validate(&row, &*collection_desc).map_err(|e| {
592 StorageErrorXKind::invalid_record_batch(e).with_context("stage_batches")
593 })?;
594 Ok(row)
595 });
596 match maybe_row {
597 Ok(row) => {
599 let data = SourceData(Ok(row));
600 batch_builder
601 .add(&data, &(), &lower, &1)
602 .await
603 .expect("failed to add Row to batch");
604 }
605 Err(err) => {
607 let batch = batch_builder
609 .finish(upper)
610 .await
611 .expect("failed to cleanup batch");
612 batch.delete().await;
613
614 proto_batch_handle
616 .give(&proto_batch_cap, Err(err).context("stage batches"));
617 return;
618 }
619 }
620 }
621 }
622
623 let batch = batch_builder
624 .finish(upper)
625 .await
626 .expect("failed to create Batch");
627
628 let proto_batch = batch.into_transmittable_batch();
637 proto_batch_handle.give(&proto_batch_cap, Ok(proto_batch));
638 });
639
640 (proto_batch_stream, shutdown.press_on_drop())
641}
642
643pub fn render_completion_operator<G, F>(
646 scope: G,
647 results_stream: StreamVec<G, Result<ProtoBatch, StorageErrorX>>,
648 worker_callback: F,
649) where
650 G: Scope,
651 F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
652{
653 let mut builder = AsyncOperatorBuilder::new("CopyFrom-completion".to_string(), scope.clone());
654 let mut results_input = builder.new_disconnected_input(results_stream, Pipeline);
655
656 builder.build(move |_| async move {
657 let result = async move {
658 let mut maybe_payload: Option<ProtoBatch> = None;
659
660 while let Some(event) = results_input.next().await {
661 if let AsyncEvent::Data(_cap, results) = event {
662 let [result] = results
663 .try_into()
664 .expect("only 1 event on the result stream");
665
666 if maybe_payload.is_some() {
668 panic!("expected only one batch!");
669 }
670
671 maybe_payload = Some(result.map_err(|e| e.to_string())?);
672 }
673 }
674
675 Ok(maybe_payload)
676 }
677 .await;
678
679 worker_callback(result);
681 });
682}
683
684pub trait OneshotObject {
686 fn name(&self) -> &str;
688
689 fn path(&self) -> &str;
691
692 fn size(&self) -> usize;
694
695 fn encodings(&self) -> &[Encoding];
701}
702
703#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
705pub enum Encoding {
706 Bzip2,
707 Gzip,
708 Xz,
709 Zstd,
710}
711
712pub trait OneshotSource: Clone + Send + Unpin {
714 type Object: OneshotObject
716 + Debug
717 + Clone
718 + Send
719 + Unpin
720 + Serialize
721 + DeserializeOwned
722 + 'static;
723 type Checksum: Debug + Clone + Send + Unpin + Serialize + DeserializeOwned + 'static;
725
726 fn list<'a>(
728 &'a self,
729 ) -> impl Future<Output = Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX>> + Send;
730
731 fn get<'s>(
733 &'s self,
734 object: Self::Object,
735 checksum: Self::Checksum,
736 range: Option<std::ops::RangeInclusive<usize>>,
737 ) -> BoxStream<'s, Result<Bytes, StorageErrorX>>;
738}
739
740#[derive(Clone, Debug)]
747pub(crate) enum SourceKind {
748 Http(HttpOneshotSource),
749 AwsS3(AwsS3Source),
750}
751
752impl OneshotSource for SourceKind {
753 type Object = ObjectKind;
754 type Checksum = ChecksumKind;
755
756 async fn list<'a>(&'a self) -> Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX> {
757 match self {
758 SourceKind::Http(http) => {
759 let objects = http.list().await.context("http")?;
760 let objects = objects
761 .into_iter()
762 .map(|(object, checksum)| {
763 (ObjectKind::Http(object), ChecksumKind::Http(checksum))
764 })
765 .collect();
766 Ok(objects)
767 }
768 SourceKind::AwsS3(s3) => {
769 let objects = s3.list().await.context("s3")?;
770 let objects = objects
771 .into_iter()
772 .map(|(object, checksum)| {
773 (ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum))
774 })
775 .collect();
776 Ok(objects)
777 }
778 }
779 }
780
781 fn get<'s>(
782 &'s self,
783 object: Self::Object,
784 checksum: Self::Checksum,
785 range: Option<std::ops::RangeInclusive<usize>>,
786 ) -> BoxStream<'s, Result<Bytes, StorageErrorX>> {
787 match (self, object, checksum) {
788 (SourceKind::Http(http), ObjectKind::Http(object), ChecksumKind::Http(checksum)) => {
789 http.get(object, checksum, range)
790 .map(|result| result.context("http"))
791 .boxed()
792 }
793 (SourceKind::AwsS3(s3), ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum)) => s3
794 .get(object, checksum, range)
795 .map(|result| result.context("aws_s3"))
796 .boxed(),
797 (SourceKind::AwsS3(_) | SourceKind::Http(_), _, _) => {
798 unreachable!("programming error! wrong source, object, and checksum kind");
799 }
800 }
801 }
802}
803
804#[derive(Debug, Clone, Serialize, Deserialize)]
806pub(crate) enum ObjectKind {
807 Http(HttpObject),
808 AwsS3(S3Object),
809}
810
811impl OneshotObject for ObjectKind {
812 fn name(&self) -> &str {
813 match self {
814 ObjectKind::Http(object) => object.name(),
815 ObjectKind::AwsS3(object) => object.name(),
816 }
817 }
818
819 fn path(&self) -> &str {
820 match self {
821 ObjectKind::Http(object) => object.path(),
822 ObjectKind::AwsS3(object) => object.path(),
823 }
824 }
825
826 fn size(&self) -> usize {
827 match self {
828 ObjectKind::Http(object) => object.size(),
829 ObjectKind::AwsS3(object) => object.size(),
830 }
831 }
832
833 fn encodings(&self) -> &[Encoding] {
834 match self {
835 ObjectKind::Http(object) => object.encodings(),
836 ObjectKind::AwsS3(object) => object.encodings(),
837 }
838 }
839}
840
841#[derive(Debug, Clone, Serialize, Deserialize)]
843pub(crate) enum ChecksumKind {
844 Http(HttpChecksum),
845 AwsS3(S3Checksum),
846}
847
848pub trait OneshotFormat: Clone {
850 type WorkRequest<S>: Debug + Clone + Send + Serialize + DeserializeOwned + 'static
852 where
853 S: OneshotSource;
854 type RecordChunk: Debug + Clone + Send + Serialize + DeserializeOwned + 'static;
856
857 fn split_work<S: OneshotSource + Send>(
862 &self,
863 source: S,
864 object: S::Object,
865 checksum: S::Checksum,
866 ) -> impl Future<Output = Result<Vec<Self::WorkRequest<S>>, StorageErrorX>> + Send;
867
868 fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
871 &'a self,
872 source: &'a S,
873 request: Self::WorkRequest<S>,
874 ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>>;
875
876 fn decode_chunk(
878 &self,
879 chunk: Self::RecordChunk,
880 rows: &mut Vec<Row>,
881 ) -> Result<usize, StorageErrorX>;
882}
883
884#[derive(Clone, Debug)]
891pub(crate) enum FormatKind {
892 Csv(CsvDecoder),
893 Parquet(ParquetFormat),
894}
895
896impl OneshotFormat for FormatKind {
897 type WorkRequest<S>
898 = RequestKind<S::Object, S::Checksum>
899 where
900 S: OneshotSource;
901 type RecordChunk = RecordChunkKind;
902
903 async fn split_work<S: OneshotSource + Send>(
904 &self,
905 source: S,
906 object: S::Object,
907 checksum: S::Checksum,
908 ) -> Result<Vec<Self::WorkRequest<S>>, StorageErrorX> {
909 match self {
910 FormatKind::Csv(csv) => {
911 let work = csv
912 .split_work(source, object, checksum)
913 .await
914 .context("csv")?
915 .into_iter()
916 .map(RequestKind::Csv)
917 .collect();
918 Ok(work)
919 }
920 FormatKind::Parquet(parquet) => {
921 let work = parquet
922 .split_work(source, object, checksum)
923 .await
924 .context("parquet")?
925 .into_iter()
926 .map(RequestKind::Parquet)
927 .collect();
928 Ok(work)
929 }
930 }
931 }
932
933 fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
934 &'a self,
935 source: &'a S,
936 request: Self::WorkRequest<S>,
937 ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>> {
938 match (self, request) {
939 (FormatKind::Csv(csv), RequestKind::Csv(request)) => csv
940 .fetch_work(source, request)
941 .map_ok(RecordChunkKind::Csv)
942 .map(|result| result.context("csv"))
943 .boxed(),
944 (FormatKind::Parquet(parquet), RequestKind::Parquet(request)) => parquet
945 .fetch_work(source, request)
946 .map_ok(RecordChunkKind::Parquet)
947 .map(|result| result.context("parquet"))
948 .boxed(),
949 (FormatKind::Parquet(_), RequestKind::Csv(_))
950 | (FormatKind::Csv(_), RequestKind::Parquet(_)) => {
951 unreachable!("programming error, {self:?}")
952 }
953 }
954 }
955
956 fn decode_chunk(
957 &self,
958 chunk: Self::RecordChunk,
959 rows: &mut Vec<Row>,
960 ) -> Result<usize, StorageErrorX> {
961 match (self, chunk) {
962 (FormatKind::Csv(csv), RecordChunkKind::Csv(chunk)) => {
963 csv.decode_chunk(chunk, rows).context("csv")
964 }
965 (FormatKind::Parquet(parquet), RecordChunkKind::Parquet(chunk)) => {
966 parquet.decode_chunk(chunk, rows).context("parquet")
967 }
968 (FormatKind::Parquet(_), RecordChunkKind::Csv(_))
969 | (FormatKind::Csv(_), RecordChunkKind::Parquet(_)) => {
970 unreachable!("programming error, {self:?}")
971 }
972 }
973 }
974}
975
976#[derive(Clone, Debug, Serialize, Deserialize)]
977pub(crate) enum RequestKind<O, C> {
978 Csv(CsvWorkRequest<O, C>),
979 Parquet(ParquetWorkRequest<O, C>),
980}
981
982#[derive(Clone, Debug, Serialize, Deserialize)]
983pub(crate) enum RecordChunkKind {
984 Csv(CsvRecord),
985 Parquet(ParquetRowGroup),
986}
987
988pub(crate) enum ObjectFilter {
989 None,
990 Files(BTreeSet<Box<str>>),
991 Pattern(glob::Pattern),
992}
993
994impl ObjectFilter {
995 pub fn try_new(filter: ContentFilter) -> Result<Self, anyhow::Error> {
996 match filter {
997 ContentFilter::None => Ok(ObjectFilter::None),
998 ContentFilter::Files(files) => {
999 let files = files.into_iter().map(|f| f.into()).collect();
1000 Ok(ObjectFilter::Files(files))
1001 }
1002 ContentFilter::Pattern(pattern) => {
1003 let pattern = glob::Pattern::new(&pattern)?;
1004 Ok(ObjectFilter::Pattern(pattern))
1005 }
1006 }
1007 }
1008
1009 pub fn filter<S: OneshotSource>(&self, object: &S::Object) -> bool {
1011 match self {
1012 ObjectFilter::None => true,
1013 ObjectFilter::Files(files) => files.contains(object.path()),
1014 ObjectFilter::Pattern(pattern) => pattern.matches(object.path()),
1015 }
1016 }
1017}
1018
1019#[derive(Debug, Clone, Deserialize, Serialize)]
1026pub struct StorageErrorX {
1027 kind: StorageErrorXKind,
1028 context: LinkedList<String>,
1029}
1030
1031impl fmt::Display for StorageErrorX {
1032 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1033 writeln!(f, "error: {}", self.kind)?;
1034 writeln!(f, "causes: {:?}", self.context)?;
1035 Ok(())
1036 }
1037}
1038
1039#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
1041pub enum StorageErrorXKind {
1042 #[error("csv decoding error: {0}")]
1043 CsvDecoding(Arc<str>),
1044 #[error("parquet error: {0}")]
1045 ParquetError(Arc<str>),
1046 #[error("reqwest error: {0}")]
1047 Reqwest(Arc<str>),
1048 #[error("aws s3 request error: {0}")]
1049 AwsS3Request(String),
1050 #[error("aws s3 bytestream error: {0}")]
1051 AwsS3Bytes(Arc<str>),
1052 #[error("invalid reqwest header: {0}")]
1053 InvalidHeader(Arc<str>),
1054 #[error("failed to decode Row from a record batch: {0}")]
1055 InvalidRecordBatch(Arc<str>),
1056 #[error("programming error: {0}")]
1057 ProgrammingError(Arc<str>),
1058 #[error("failed to get the size of an object")]
1059 MissingSize,
1060 #[error("object is missing the required '{0}' field")]
1061 MissingField(Arc<str>),
1062 #[error("failed while evaluating the provided mfp: '{0}'")]
1063 MfpEvalError(Arc<str>),
1064 #[error("no matching files found at the given url")]
1065 NoMatchingFiles,
1066 #[error("something went wrong: {0}")]
1067 Generic(String),
1068}
1069
1070impl From<csv_async::Error> for StorageErrorXKind {
1071 fn from(err: csv_async::Error) -> Self {
1072 StorageErrorXKind::CsvDecoding(err.to_string().into())
1073 }
1074}
1075
1076impl From<reqwest::Error> for StorageErrorXKind {
1077 fn from(err: reqwest::Error) -> Self {
1078 StorageErrorXKind::Reqwest(err.to_string().into())
1079 }
1080}
1081
1082impl From<reqwest::header::ToStrError> for StorageErrorXKind {
1083 fn from(err: reqwest::header::ToStrError) -> Self {
1084 StorageErrorXKind::InvalidHeader(err.to_string().into())
1085 }
1086}
1087
1088impl From<aws_smithy_types::byte_stream::error::Error> for StorageErrorXKind {
1089 fn from(err: aws_smithy_types::byte_stream::error::Error) -> Self {
1090 StorageErrorXKind::AwsS3Bytes(DisplayErrorContext(err).to_string().into())
1091 }
1092}
1093
1094impl From<::parquet::errors::ParquetError> for StorageErrorXKind {
1095 fn from(err: ::parquet::errors::ParquetError) -> Self {
1096 StorageErrorXKind::ParquetError(err.to_string().into())
1097 }
1098}
1099
1100impl StorageErrorXKind {
1101 pub fn with_context<C: Display>(self, context: C) -> StorageErrorX {
1102 StorageErrorX {
1103 kind: self,
1104 context: LinkedList::from([context.to_string()]),
1105 }
1106 }
1107
1108 pub fn invalid_record_batch<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1109 StorageErrorXKind::InvalidRecordBatch(error.into())
1110 }
1111
1112 pub fn generic<C: Display>(error: C) -> StorageErrorXKind {
1113 StorageErrorXKind::Generic(error.to_string())
1114 }
1115
1116 pub fn programming_error<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1117 StorageErrorXKind::ProgrammingError(error.into())
1118 }
1119}
1120
1121impl<E> From<E> for StorageErrorX
1122where
1123 E: Into<StorageErrorXKind>,
1124{
1125 fn from(err: E) -> Self {
1126 StorageErrorX {
1127 kind: err.into(),
1128 context: LinkedList::new(),
1129 }
1130 }
1131}
1132
1133trait StorageErrorXContext<T> {
1134 fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1135 where
1136 C: Display;
1137}
1138
1139impl<T, E> StorageErrorXContext<T> for Result<T, E>
1140where
1141 E: Into<StorageErrorXKind>,
1142{
1143 fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1144 where
1145 C: Display,
1146 {
1147 match self {
1148 Ok(val) => Ok(val),
1149 Err(kind) => Err(StorageErrorX {
1150 kind: kind.into(),
1151 context: LinkedList::from([context.to_string()]),
1152 }),
1153 }
1154 }
1155}
1156
1157impl<T> StorageErrorXContext<T> for Result<T, StorageErrorX> {
1158 fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1159 where
1160 C: Display,
1161 {
1162 match self {
1163 Ok(val) => Ok(val),
1164 Err(mut e) => {
1165 e.context.push_back(context.to_string());
1166 Err(e)
1167 }
1168 }
1169 }
1170}