1use std::fmt;
66use std::sync::Arc;
67
68use bytes::Bytes;
69use differential_dataflow::Hashable;
70use futures::stream::BoxStream;
71use futures::{StreamExt, TryStreamExt};
72use mz_expr::SafeMfpPlan;
73use mz_ore::cast::CastFrom;
74use mz_persist_client::Diagnostics;
75use mz_persist_client::batch::ProtoBatch;
76use mz_persist_client::cache::PersistClientCache;
77use mz_persist_types::Codec;
78use mz_persist_types::codec_impls::UnitSchema;
79use mz_repr::{DatumVec, GlobalId, Row, RowArena, Timestamp};
80use mz_storage_types::StorageDiff;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::controller::CollectionMetadata;
83use mz_storage_types::oneshot_sources::{
84 ContentFilter, ContentFormat, ContentSource, OneshotIngestionRequest,
85};
86use mz_storage_types::sources::SourceData;
87use mz_timely_util::builder_async::{
88 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
89};
90use mz_timely_util::pact::Distribute;
91use serde::de::DeserializeOwned;
92use serde::{Deserialize, Serialize};
93use std::collections::{BTreeSet, LinkedList};
94use std::fmt::{Debug, Display};
95use std::future::Future;
96use timely::container::CapacityContainerBuilder;
97use timely::dataflow::channels::pact::Pipeline;
98use timely::dataflow::{Scope, StreamVec};
99use timely::progress::Antichain;
100use tracing::info;
101
102use crate::oneshot_source::aws_source::{AwsS3Source, S3Checksum, S3Object};
103use crate::oneshot_source::csv::{CsvDecoder, CsvRecord, CsvWorkRequest};
104use crate::oneshot_source::http_source::{HttpChecksum, HttpObject, HttpOneshotSource};
105use crate::oneshot_source::parquet::{ParquetFormat, ParquetRowGroup, ParquetWorkRequest};
106
107pub mod csv;
108pub mod parquet;
109
110pub mod aws_source;
111pub mod http_source;
112
113mod util;
114
115pub fn render<G, F>(
133 scope: G,
134 persist_clients: Arc<PersistClientCache>,
135 connection_context: ConnectionContext,
136 collection_id: GlobalId,
137 collection_meta: CollectionMetadata,
138 request: OneshotIngestionRequest,
139 worker_callback: F,
140) -> Vec<PressOnDropButton>
141where
142 G: Scope<Timestamp = Timestamp>,
143 F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
144{
145 let OneshotIngestionRequest {
146 source,
147 format,
148 filter,
149 shape,
150 } = request;
151
152 let source = match source {
153 ContentSource::Http { url } => {
154 let source = HttpOneshotSource::new(reqwest::Client::default(), url);
155 SourceKind::Http(source)
156 }
157 ContentSource::AwsS3 {
158 connection,
159 connection_id,
160 uri,
161 } => {
162 let source = AwsS3Source::new(connection, connection_id, connection_context, uri);
163 SourceKind::AwsS3(source)
164 }
165 };
166 tracing::info!(?source, "created oneshot source");
167
168 let format = match format {
169 ContentFormat::Csv(params) => {
170 let format = CsvDecoder::new(params, &shape.source_desc);
171 FormatKind::Csv(format)
172 }
173 ContentFormat::Parquet => {
174 let format = ParquetFormat::new(shape.source_desc);
175 FormatKind::Parquet(format)
176 }
177 };
178
179 let (objects_stream, discover_token) =
181 render_discover_objects(scope.clone(), collection_id, source.clone(), filter);
182 let (work_stream, split_token) = render_split_work(
184 scope.clone(),
185 collection_id,
186 objects_stream,
187 source.clone(),
188 format.clone(),
189 );
190 let (records_stream, fetch_token) = render_fetch_work(
192 scope.clone(),
193 collection_id,
194 source.clone(),
195 format.clone(),
196 work_stream,
197 );
198 let (rows_stream, decode_token) = render_decode_chunk(
200 scope.clone(),
201 format.clone(),
202 records_stream,
203 shape.source_mfp,
204 );
205 let (batch_stream, batch_token) = render_stage_batches_operator(
207 scope.clone(),
208 collection_id,
209 &collection_meta,
210 persist_clients,
211 rows_stream,
212 );
213
214 render_completion_operator(scope, batch_stream, worker_callback);
216
217 let tokens = vec![
218 discover_token,
219 split_token,
220 fetch_token,
221 decode_token,
222 batch_token,
223 ];
224
225 tokens
226}
227
228pub fn render_discover_objects<G, S>(
231 scope: G,
232 collection_id: GlobalId,
233 source: S,
234 filter: ContentFilter,
235) -> (
236 StreamVec<G, Result<(S::Object, S::Checksum), StorageErrorX>>,
237 PressOnDropButton,
238)
239where
240 G: Scope<Timestamp = Timestamp>,
241 S: OneshotSource + 'static,
242{
243 let worker_id = scope.index();
245 let num_workers = scope.peers();
246 let active_worker_id = usize::cast_from((collection_id, "discover").hashed()) % num_workers;
247 let is_active_worker = worker_id == active_worker_id;
248
249 let mut builder = AsyncOperatorBuilder::new("CopyFrom-discover".to_string(), scope.clone());
250
251 let (start_handle, start_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
252
253 let shutdown = builder.build(move |caps| async move {
254 let [start_cap] = caps.try_into().unwrap();
255
256 if !is_active_worker {
257 return;
258 }
259
260 let filter = match ObjectFilter::try_new(filter) {
261 Ok(filter) => filter,
262 Err(err) => {
263 tracing::warn!(?err, "failed to create filter");
264 start_handle.give(&start_cap, Err(StorageErrorXKind::generic(err).into()));
265 return;
266 }
267 };
268
269 let work = source.list().await.context("list");
270 match work {
271 Ok(objects) => {
272 let (include, exclude): (Vec<_>, Vec<_>) = objects
273 .into_iter()
274 .partition(|(o, _checksum)| filter.filter::<S>(o));
275 tracing::info!(%worker_id, ?include, ?exclude, "listed objects");
276 if include.is_empty() {
277 let err = StorageErrorXKind::NoMatchingFiles.with_context("discover");
278 start_handle.give(&start_cap, Err(err));
279 return;
280 }
281 include
282 .into_iter()
283 .for_each(|object| start_handle.give(&start_cap, Ok(object)))
284 }
285 Err(err) => {
286 tracing::warn!(?err, "failed to list oneshot source");
287 start_handle.give(&start_cap, Err(err))
288 }
289 }
290 });
291
292 (start_stream, shutdown.press_on_drop())
293}
294
295pub fn render_split_work<G, S, F>(
298 scope: G,
299 collection_id: GlobalId,
300 objects: StreamVec<G, Result<(S::Object, S::Checksum), StorageErrorX>>,
301 source: S,
302 format: F,
303) -> (
304 StreamVec<G, Result<F::WorkRequest<S>, StorageErrorX>>,
305 PressOnDropButton,
306)
307where
308 G: Scope,
309 S: OneshotSource + Send + Sync + 'static,
310 F: OneshotFormat + Send + Sync + 'static,
311{
312 let worker_id = scope.index();
313 let mut builder = AsyncOperatorBuilder::new("CopyFrom-split_work".to_string(), scope.clone());
314
315 let (request_handle, request_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
316 let mut objects_handle = builder.new_input_for(objects, Distribute, &request_handle);
317
318 let shutdown = builder.build(move |caps| async move {
319 let [_objects_cap] = caps.try_into().unwrap();
320
321 info!(%collection_id, %worker_id, "CopyFrom Split Work");
322
323 while let Some(event) = objects_handle.next().await {
324 let (capability, maybe_objects) = match event {
325 AsyncEvent::Data(cap, req) => (cap, req),
326 AsyncEvent::Progress(_) => continue,
327 };
328
329 let result = async {
332 let mut requests = Vec::new();
333
334 for maybe_object in maybe_objects {
335 let (object, checksum) = maybe_object?;
337
338 let format_ = format.clone();
339 let source_ = source.clone();
340 let work_requests = mz_ore::task::spawn(|| "split-work", async move {
341 info!(%worker_id, object = %object.name(), "splitting object");
342 format_.split_work(source_.clone(), object, checksum).await
343 })
344 .await?;
345
346 requests.extend(work_requests);
347 }
348
349 Ok::<_, StorageErrorX>(requests)
350 }
351 .await
352 .context("split");
353
354 match result {
355 Ok(requests) => requests
356 .into_iter()
357 .for_each(|req| request_handle.give(&capability, Ok(req))),
358 Err(err) => request_handle.give(&capability, Err(err)),
359 }
360 }
361 });
362
363 (request_stream, shutdown.press_on_drop())
364}
365
366pub fn render_fetch_work<G, S, F>(
370 scope: G,
371 collection_id: GlobalId,
372 source: S,
373 format: F,
374 work_requests: StreamVec<G, Result<F::WorkRequest<S>, StorageErrorX>>,
375) -> (
376 StreamVec<G, Result<F::RecordChunk, StorageErrorX>>,
377 PressOnDropButton,
378)
379where
380 G: Scope,
381 S: OneshotSource + Sync + 'static,
382 F: OneshotFormat + Sync + 'static,
383{
384 let worker_id = scope.index();
385 let mut builder = AsyncOperatorBuilder::new("CopyFrom-fetch_work".to_string(), scope.clone());
386
387 let (record_handle, record_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
388 let mut work_requests_handle = builder.new_input_for(work_requests, Distribute, &record_handle);
389
390 let shutdown = builder.build(move |caps| async move {
391 let [_work_cap] = caps.try_into().unwrap();
392
393 info!(%collection_id, %worker_id, "CopyFrom Fetch Work");
394
395 while let Some(event) = work_requests_handle.next().await {
396 let (capability, maybe_requests) = match event {
397 AsyncEvent::Data(cap, req) => (cap, req),
398 AsyncEvent::Progress(_) => continue,
399 };
400
401 let result = async {
403 for maybe_request in maybe_requests {
405 let request = maybe_request?;
406
407 let mut work_stream = format.fetch_work(&source, request);
408 while let Some(result) = work_stream.next().await {
409 let record_chunk = result.context("fetch worker")?;
411
412 record_handle.give(&capability, Ok(record_chunk));
417 }
418 }
419
420 Ok::<_, StorageErrorX>(())
421 }
422 .await
423 .context("fetch work");
424
425 if let Err(err) = result {
426 tracing::warn!(?err, "failed to fetch");
427 record_handle.give(&capability, Err(err))
428 }
429 }
430 });
431
432 (record_stream, shutdown.press_on_drop())
433}
434
435pub fn render_decode_chunk<G, F>(
438 scope: G,
439 format: F,
440 record_chunks: StreamVec<G, Result<F::RecordChunk, StorageErrorX>>,
441 mfp: SafeMfpPlan,
442) -> (StreamVec<G, Result<Row, StorageErrorX>>, PressOnDropButton)
443where
444 G: Scope,
445 F: OneshotFormat + 'static,
446{
447 let mut builder = AsyncOperatorBuilder::new("CopyFrom-decode_chunk".to_string(), scope.clone());
448
449 let (row_handle, row_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
450 let mut record_chunk_handle = builder.new_input_for(record_chunks, Distribute, &row_handle);
451
452 let shutdown = builder.build(move |caps| async move {
453 let [_row_cap] = caps.try_into().unwrap();
454
455 let mut datum_vec = DatumVec::default();
456 let row_arena = RowArena::default();
457 let mut row_buf = Row::default();
458
459 while let Some(event) = record_chunk_handle.next().await {
460 let (capability, maybe_chunks) = match event {
461 AsyncEvent::Data(cap, data) => (cap, data),
462 AsyncEvent::Progress(_) => continue,
463 };
464
465 let result = async {
466 let mut rows = Vec::new();
467 for maybe_chunk in maybe_chunks {
468 let chunk = maybe_chunk?;
469 format.decode_chunk(chunk, &mut rows)?;
470 }
471 Ok::<_, StorageErrorX>(rows)
472 }
473 .await
474 .context("decode chunk");
475
476 match result {
477 Ok(rows) => {
478 for row in rows {
481 let mut datums = datum_vec.borrow_with(&row);
482 let result = mfp
483 .evaluate_into(&mut *datums, &row_arena, &mut row_buf)
484 .map(|row| row.cloned());
485
486 match result {
487 Ok(Some(row)) => row_handle.give(&capability, Ok(row)),
488 Ok(None) => {
489 mz_ore::soft_panic_or_log!("oneshot source MFP filtered out data!");
492 }
493 Err(e) => {
494 let err = StorageErrorXKind::MfpEvalError(e.to_string().into())
495 .with_context("decode");
496 row_handle.give(&capability, Err(err))
497 }
498 }
499 }
500 }
501 Err(err) => row_handle.give(&capability, Err(err)),
502 }
503 }
504 });
505
506 (row_stream, shutdown.press_on_drop())
507}
508
509pub fn render_stage_batches_operator<G>(
512 scope: G,
513 collection_id: GlobalId,
514 collection_meta: &CollectionMetadata,
515 persist_clients: Arc<PersistClientCache>,
516 rows_stream: StreamVec<G, Result<Row, StorageErrorX>>,
517) -> (
518 StreamVec<G, Result<ProtoBatch, StorageErrorX>>,
519 PressOnDropButton,
520)
521where
522 G: Scope,
523{
524 let persist_location = collection_meta.persist_location.clone();
525 let shard_id = collection_meta.data_shard;
526 let collection_desc = Arc::new(collection_meta.relation_desc.clone());
527
528 let mut builder =
529 AsyncOperatorBuilder::new("CopyFrom-stage_batches".to_string(), scope.clone());
530
531 let (proto_batch_handle, proto_batch_stream) =
532 builder.new_output::<CapacityContainerBuilder<_>>();
533 let mut rows_handle = builder.new_input_for(rows_stream, Pipeline, &proto_batch_handle);
534
535 let shutdown = builder.build(move |caps| async move {
536 let [proto_batch_cap] = caps.try_into().unwrap();
537
538 let persist_client = persist_clients
540 .open(persist_location)
541 .await
542 .expect("failed to open Persist client");
543 let persist_diagnostics = Diagnostics {
544 shard_name: collection_id.to_string(),
545 handle_purpose: "CopyFrom::stage_batches".to_string(),
546 };
547 let write_handle = persist_client
548 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
549 shard_id,
550 Arc::clone(&collection_desc),
551 Arc::new(UnitSchema),
552 persist_diagnostics,
553 )
554 .await
555 .expect("could not open Persist shard");
556
557 let lower = mz_repr::Timestamp::MIN;
561 let upper = Antichain::from_elem(lower.step_forward());
562
563 let mut batch_builder = write_handle.builder(Antichain::from_elem(lower));
564
565 while let Some(event) = rows_handle.next().await {
566 let AsyncEvent::Data(_, row_batch) = event else {
567 continue;
568 };
569
570 for maybe_row in row_batch {
572 let maybe_row = maybe_row.and_then(|row| {
573 Row::validate(&row, &*collection_desc).map_err(|e| {
574 StorageErrorXKind::invalid_record_batch(e).with_context("stage_batches")
575 })?;
576 Ok(row)
577 });
578 match maybe_row {
579 Ok(row) => {
581 let data = SourceData(Ok(row));
582 batch_builder
583 .add(&data, &(), &lower, &1)
584 .await
585 .expect("failed to add Row to batch");
586 }
587 Err(err) => {
589 let batch = batch_builder
591 .finish(upper)
592 .await
593 .expect("failed to cleanup batch");
594 batch.delete().await;
595
596 proto_batch_handle
598 .give(&proto_batch_cap, Err(err).context("stage batches"));
599 return;
600 }
601 }
602 }
603 }
604
605 let batch = batch_builder
606 .finish(upper)
607 .await
608 .expect("failed to create Batch");
609
610 let proto_batch = batch.into_transmittable_batch();
619 proto_batch_handle.give(&proto_batch_cap, Ok(proto_batch));
620 });
621
622 (proto_batch_stream, shutdown.press_on_drop())
623}
624
625pub fn render_completion_operator<G, F>(
628 scope: G,
629 results_stream: StreamVec<G, Result<ProtoBatch, StorageErrorX>>,
630 worker_callback: F,
631) where
632 G: Scope,
633 F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
634{
635 let mut builder = AsyncOperatorBuilder::new("CopyFrom-completion".to_string(), scope.clone());
636 let mut results_input = builder.new_disconnected_input(results_stream, Pipeline);
637
638 builder.build(move |_| async move {
639 let result = async move {
640 let mut maybe_payload: Option<ProtoBatch> = None;
641
642 while let Some(event) = results_input.next().await {
643 if let AsyncEvent::Data(_cap, results) = event {
644 let [result] = results
645 .try_into()
646 .expect("only 1 event on the result stream");
647
648 if maybe_payload.is_some() {
650 panic!("expected only one batch!");
651 }
652
653 maybe_payload = Some(result.map_err(|e| e.to_string())?);
654 }
655 }
656
657 Ok(maybe_payload)
658 }
659 .await;
660
661 worker_callback(result);
663 });
664}
665
666pub trait OneshotObject {
668 fn name(&self) -> &str;
670
671 fn path(&self) -> &str;
673
674 fn size(&self) -> usize;
676
677 fn encodings(&self) -> &[Encoding];
683}
684
685#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
687pub enum Encoding {
688 Bzip2,
689 Gzip,
690 Xz,
691 Zstd,
692}
693
694pub trait OneshotSource: Clone + Send + Unpin {
696 type Object: OneshotObject
698 + Debug
699 + Clone
700 + Send
701 + Unpin
702 + Serialize
703 + DeserializeOwned
704 + 'static;
705 type Checksum: Debug + Clone + Send + Unpin + Serialize + DeserializeOwned + 'static;
707
708 fn list<'a>(
710 &'a self,
711 ) -> impl Future<Output = Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX>> + Send;
712
713 fn get<'s>(
715 &'s self,
716 object: Self::Object,
717 checksum: Self::Checksum,
718 range: Option<std::ops::RangeInclusive<usize>>,
719 ) -> BoxStream<'s, Result<Bytes, StorageErrorX>>;
720}
721
722#[derive(Clone, Debug)]
729pub(crate) enum SourceKind {
730 Http(HttpOneshotSource),
731 AwsS3(AwsS3Source),
732}
733
734impl OneshotSource for SourceKind {
735 type Object = ObjectKind;
736 type Checksum = ChecksumKind;
737
738 async fn list<'a>(&'a self) -> Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX> {
739 match self {
740 SourceKind::Http(http) => {
741 let objects = http.list().await.context("http")?;
742 let objects = objects
743 .into_iter()
744 .map(|(object, checksum)| {
745 (ObjectKind::Http(object), ChecksumKind::Http(checksum))
746 })
747 .collect();
748 Ok(objects)
749 }
750 SourceKind::AwsS3(s3) => {
751 let objects = s3.list().await.context("s3")?;
752 let objects = objects
753 .into_iter()
754 .map(|(object, checksum)| {
755 (ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum))
756 })
757 .collect();
758 Ok(objects)
759 }
760 }
761 }
762
763 fn get<'s>(
764 &'s self,
765 object: Self::Object,
766 checksum: Self::Checksum,
767 range: Option<std::ops::RangeInclusive<usize>>,
768 ) -> BoxStream<'s, Result<Bytes, StorageErrorX>> {
769 match (self, object, checksum) {
770 (SourceKind::Http(http), ObjectKind::Http(object), ChecksumKind::Http(checksum)) => {
771 http.get(object, checksum, range)
772 .map(|result| result.context("http"))
773 .boxed()
774 }
775 (SourceKind::AwsS3(s3), ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum)) => s3
776 .get(object, checksum, range)
777 .map(|result| result.context("aws_s3"))
778 .boxed(),
779 (SourceKind::AwsS3(_) | SourceKind::Http(_), _, _) => {
780 unreachable!("programming error! wrong source, object, and checksum kind");
781 }
782 }
783 }
784}
785
786#[derive(Debug, Clone, Serialize, Deserialize)]
788pub(crate) enum ObjectKind {
789 Http(HttpObject),
790 AwsS3(S3Object),
791}
792
793impl OneshotObject for ObjectKind {
794 fn name(&self) -> &str {
795 match self {
796 ObjectKind::Http(object) => object.name(),
797 ObjectKind::AwsS3(object) => object.name(),
798 }
799 }
800
801 fn path(&self) -> &str {
802 match self {
803 ObjectKind::Http(object) => object.path(),
804 ObjectKind::AwsS3(object) => object.path(),
805 }
806 }
807
808 fn size(&self) -> usize {
809 match self {
810 ObjectKind::Http(object) => object.size(),
811 ObjectKind::AwsS3(object) => object.size(),
812 }
813 }
814
815 fn encodings(&self) -> &[Encoding] {
816 match self {
817 ObjectKind::Http(object) => object.encodings(),
818 ObjectKind::AwsS3(object) => object.encodings(),
819 }
820 }
821}
822
823#[derive(Debug, Clone, Serialize, Deserialize)]
825pub(crate) enum ChecksumKind {
826 Http(HttpChecksum),
827 AwsS3(S3Checksum),
828}
829
830pub trait OneshotFormat: Clone {
832 type WorkRequest<S>: Debug + Clone + Send + Serialize + DeserializeOwned + 'static
834 where
835 S: OneshotSource;
836 type RecordChunk: Debug + Clone + Send + Serialize + DeserializeOwned + 'static;
838
839 fn split_work<S: OneshotSource + Send>(
844 &self,
845 source: S,
846 object: S::Object,
847 checksum: S::Checksum,
848 ) -> impl Future<Output = Result<Vec<Self::WorkRequest<S>>, StorageErrorX>> + Send;
849
850 fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
853 &'a self,
854 source: &'a S,
855 request: Self::WorkRequest<S>,
856 ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>>;
857
858 fn decode_chunk(
860 &self,
861 chunk: Self::RecordChunk,
862 rows: &mut Vec<Row>,
863 ) -> Result<usize, StorageErrorX>;
864}
865
866#[derive(Clone, Debug)]
873pub(crate) enum FormatKind {
874 Csv(CsvDecoder),
875 Parquet(ParquetFormat),
876}
877
878impl OneshotFormat for FormatKind {
879 type WorkRequest<S>
880 = RequestKind<S::Object, S::Checksum>
881 where
882 S: OneshotSource;
883 type RecordChunk = RecordChunkKind;
884
885 async fn split_work<S: OneshotSource + Send>(
886 &self,
887 source: S,
888 object: S::Object,
889 checksum: S::Checksum,
890 ) -> Result<Vec<Self::WorkRequest<S>>, StorageErrorX> {
891 match self {
892 FormatKind::Csv(csv) => {
893 let work = csv
894 .split_work(source, object, checksum)
895 .await
896 .context("csv")?
897 .into_iter()
898 .map(RequestKind::Csv)
899 .collect();
900 Ok(work)
901 }
902 FormatKind::Parquet(parquet) => {
903 let work = parquet
904 .split_work(source, object, checksum)
905 .await
906 .context("parquet")?
907 .into_iter()
908 .map(RequestKind::Parquet)
909 .collect();
910 Ok(work)
911 }
912 }
913 }
914
915 fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
916 &'a self,
917 source: &'a S,
918 request: Self::WorkRequest<S>,
919 ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>> {
920 match (self, request) {
921 (FormatKind::Csv(csv), RequestKind::Csv(request)) => csv
922 .fetch_work(source, request)
923 .map_ok(RecordChunkKind::Csv)
924 .map(|result| result.context("csv"))
925 .boxed(),
926 (FormatKind::Parquet(parquet), RequestKind::Parquet(request)) => parquet
927 .fetch_work(source, request)
928 .map_ok(RecordChunkKind::Parquet)
929 .map(|result| result.context("parquet"))
930 .boxed(),
931 (FormatKind::Parquet(_), RequestKind::Csv(_))
932 | (FormatKind::Csv(_), RequestKind::Parquet(_)) => {
933 unreachable!("programming error, {self:?}")
934 }
935 }
936 }
937
938 fn decode_chunk(
939 &self,
940 chunk: Self::RecordChunk,
941 rows: &mut Vec<Row>,
942 ) -> Result<usize, StorageErrorX> {
943 match (self, chunk) {
944 (FormatKind::Csv(csv), RecordChunkKind::Csv(chunk)) => {
945 csv.decode_chunk(chunk, rows).context("csv")
946 }
947 (FormatKind::Parquet(parquet), RecordChunkKind::Parquet(chunk)) => {
948 parquet.decode_chunk(chunk, rows).context("parquet")
949 }
950 (FormatKind::Parquet(_), RecordChunkKind::Csv(_))
951 | (FormatKind::Csv(_), RecordChunkKind::Parquet(_)) => {
952 unreachable!("programming error, {self:?}")
953 }
954 }
955 }
956}
957
958#[derive(Clone, Debug, Serialize, Deserialize)]
959pub(crate) enum RequestKind<O, C> {
960 Csv(CsvWorkRequest<O, C>),
961 Parquet(ParquetWorkRequest<O, C>),
962}
963
964#[derive(Clone, Debug, Serialize, Deserialize)]
965pub(crate) enum RecordChunkKind {
966 Csv(CsvRecord),
967 Parquet(ParquetRowGroup),
968}
969
970pub(crate) enum ObjectFilter {
971 None,
972 Files(BTreeSet<Box<str>>),
973 Pattern(glob::Pattern),
974}
975
976impl ObjectFilter {
977 pub fn try_new(filter: ContentFilter) -> Result<Self, anyhow::Error> {
978 match filter {
979 ContentFilter::None => Ok(ObjectFilter::None),
980 ContentFilter::Files(files) => {
981 let files = files.into_iter().map(|f| f.into()).collect();
982 Ok(ObjectFilter::Files(files))
983 }
984 ContentFilter::Pattern(pattern) => {
985 let pattern = glob::Pattern::new(&pattern)?;
986 Ok(ObjectFilter::Pattern(pattern))
987 }
988 }
989 }
990
991 pub fn filter<S: OneshotSource>(&self, object: &S::Object) -> bool {
993 match self {
994 ObjectFilter::None => true,
995 ObjectFilter::Files(files) => files.contains(object.path()),
996 ObjectFilter::Pattern(pattern) => pattern.matches(object.path()),
997 }
998 }
999}
1000
1001#[derive(Debug, Clone, Deserialize, Serialize)]
1008pub struct StorageErrorX {
1009 kind: StorageErrorXKind,
1010 context: LinkedList<String>,
1011}
1012
1013impl fmt::Display for StorageErrorX {
1014 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1015 writeln!(f, "error: {}", self.kind)?;
1016 writeln!(f, "causes: {:?}", self.context)?;
1017 Ok(())
1018 }
1019}
1020
1021#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
1023pub enum StorageErrorXKind {
1024 #[error("csv decoding error: {0}")]
1025 CsvDecoding(Arc<str>),
1026 #[error("parquet error: {0}")]
1027 ParquetError(Arc<str>),
1028 #[error("reqwest error: {0}")]
1029 Reqwest(Arc<str>),
1030 #[error("aws s3 request error: {0}")]
1031 AwsS3Request(String),
1032 #[error("aws s3 bytestream error: {0}")]
1033 AwsS3Bytes(Arc<str>),
1034 #[error("invalid reqwest header: {0}")]
1035 InvalidHeader(Arc<str>),
1036 #[error("failed to decode Row from a record batch: {0}")]
1037 InvalidRecordBatch(Arc<str>),
1038 #[error("programming error: {0}")]
1039 ProgrammingError(Arc<str>),
1040 #[error("failed to get the size of an object")]
1041 MissingSize,
1042 #[error("object is missing the required '{0}' field")]
1043 MissingField(Arc<str>),
1044 #[error("failed while evaluating the provided mfp: '{0}'")]
1045 MfpEvalError(Arc<str>),
1046 #[error("no matching files found at the given url")]
1047 NoMatchingFiles,
1048 #[error("something went wrong: {0}")]
1049 Generic(String),
1050}
1051
1052impl From<csv_async::Error> for StorageErrorXKind {
1053 fn from(err: csv_async::Error) -> Self {
1054 StorageErrorXKind::CsvDecoding(err.to_string().into())
1055 }
1056}
1057
1058impl From<reqwest::Error> for StorageErrorXKind {
1059 fn from(err: reqwest::Error) -> Self {
1060 StorageErrorXKind::Reqwest(err.to_string().into())
1061 }
1062}
1063
1064impl From<reqwest::header::ToStrError> for StorageErrorXKind {
1065 fn from(err: reqwest::header::ToStrError) -> Self {
1066 StorageErrorXKind::InvalidHeader(err.to_string().into())
1067 }
1068}
1069
1070impl From<aws_smithy_types::byte_stream::error::Error> for StorageErrorXKind {
1071 fn from(err: aws_smithy_types::byte_stream::error::Error) -> Self {
1072 StorageErrorXKind::AwsS3Request(err.to_string())
1073 }
1074}
1075
1076impl From<::parquet::errors::ParquetError> for StorageErrorXKind {
1077 fn from(err: ::parquet::errors::ParquetError) -> Self {
1078 StorageErrorXKind::ParquetError(err.to_string().into())
1079 }
1080}
1081
1082impl StorageErrorXKind {
1083 pub fn with_context<C: Display>(self, context: C) -> StorageErrorX {
1084 StorageErrorX {
1085 kind: self,
1086 context: LinkedList::from([context.to_string()]),
1087 }
1088 }
1089
1090 pub fn invalid_record_batch<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1091 StorageErrorXKind::InvalidRecordBatch(error.into())
1092 }
1093
1094 pub fn generic<C: Display>(error: C) -> StorageErrorXKind {
1095 StorageErrorXKind::Generic(error.to_string())
1096 }
1097
1098 pub fn programming_error<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1099 StorageErrorXKind::ProgrammingError(error.into())
1100 }
1101}
1102
1103impl<E> From<E> for StorageErrorX
1104where
1105 E: Into<StorageErrorXKind>,
1106{
1107 fn from(err: E) -> Self {
1108 StorageErrorX {
1109 kind: err.into(),
1110 context: LinkedList::new(),
1111 }
1112 }
1113}
1114
1115trait StorageErrorXContext<T> {
1116 fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1117 where
1118 C: Display;
1119}
1120
1121impl<T, E> StorageErrorXContext<T> for Result<T, E>
1122where
1123 E: Into<StorageErrorXKind>,
1124{
1125 fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1126 where
1127 C: Display,
1128 {
1129 match self {
1130 Ok(val) => Ok(val),
1131 Err(kind) => Err(StorageErrorX {
1132 kind: kind.into(),
1133 context: LinkedList::from([context.to_string()]),
1134 }),
1135 }
1136 }
1137}
1138
1139impl<T> StorageErrorXContext<T> for Result<T, StorageErrorX> {
1140 fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1141 where
1142 C: Display,
1143 {
1144 match self {
1145 Ok(val) => Ok(val),
1146 Err(mut e) => {
1147 e.context.push_back(context.to_string());
1148 Err(e)
1149 }
1150 }
1151 }
1152}