1use std::fmt;
66use std::sync::Arc;
67
68use bytes::Bytes;
69use differential_dataflow::Hashable;
70use futures::stream::BoxStream;
71use futures::{StreamExt, TryStreamExt};
72use mz_expr::SafeMfpPlan;
73use mz_ore::cast::CastFrom;
74use mz_persist_client::Diagnostics;
75use mz_persist_client::batch::ProtoBatch;
76use mz_persist_client::cache::PersistClientCache;
77use mz_persist_types::codec_impls::UnitSchema;
78use mz_repr::{DatumVec, GlobalId, Row, RowArena, Timestamp};
79use mz_storage_types::StorageDiff;
80use mz_storage_types::connections::ConnectionContext;
81use mz_storage_types::controller::CollectionMetadata;
82use mz_storage_types::oneshot_sources::{
83 ContentFilter, ContentFormat, ContentSource, OneshotIngestionRequest,
84};
85use mz_storage_types::sources::SourceData;
86use mz_timely_util::builder_async::{
87 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
88};
89use mz_timely_util::pact::Distribute;
90use serde::de::DeserializeOwned;
91use serde::{Deserialize, Serialize};
92use std::collections::{BTreeSet, LinkedList};
93use std::fmt::{Debug, Display};
94use std::future::Future;
95use timely::container::CapacityContainerBuilder;
96use timely::dataflow::channels::pact::Pipeline;
97use timely::dataflow::{Scope, Stream as TimelyStream};
98use timely::progress::Antichain;
99use tracing::info;
100
101use crate::oneshot_source::aws_source::{AwsS3Source, S3Checksum, S3Object};
102use crate::oneshot_source::csv::{CsvDecoder, CsvRecord, CsvWorkRequest};
103use crate::oneshot_source::http_source::{HttpChecksum, HttpObject, HttpOneshotSource};
104use crate::oneshot_source::parquet::{ParquetFormat, ParquetRowGroup, ParquetWorkRequest};
105
106pub mod csv;
107pub mod parquet;
108
109pub mod aws_source;
110pub mod http_source;
111
112mod util;
113
114pub fn render<G, F>(
132 scope: G,
133 persist_clients: Arc<PersistClientCache>,
134 connection_context: ConnectionContext,
135 collection_id: GlobalId,
136 collection_meta: CollectionMetadata,
137 request: OneshotIngestionRequest,
138 worker_callback: F,
139) -> Vec<PressOnDropButton>
140where
141 G: Scope<Timestamp = Timestamp>,
142 F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
143{
144 let OneshotIngestionRequest {
145 source,
146 format,
147 filter,
148 shape,
149 } = request;
150
151 let source = match source {
152 ContentSource::Http { url } => {
153 let source = HttpOneshotSource::new(reqwest::Client::default(), url);
154 SourceKind::Http(source)
155 }
156 ContentSource::AwsS3 {
157 connection,
158 connection_id,
159 uri,
160 } => {
161 let source = AwsS3Source::new(connection, connection_id, connection_context, uri);
162 SourceKind::AwsS3(source)
163 }
164 };
165 tracing::info!(?source, "created oneshot source");
166
167 let format = match format {
168 ContentFormat::Csv(params) => {
169 let format = CsvDecoder::new(params, &shape.source_desc);
170 FormatKind::Csv(format)
171 }
172 ContentFormat::Parquet => {
173 let format = ParquetFormat::new(shape.source_desc);
174 FormatKind::Parquet(format)
175 }
176 };
177
178 let (objects_stream, discover_token) =
180 render_discover_objects(scope.clone(), collection_id, source.clone(), filter);
181 let (work_stream, split_token) = render_split_work(
183 scope.clone(),
184 collection_id,
185 &objects_stream,
186 source.clone(),
187 format.clone(),
188 );
189 let (records_stream, fetch_token) = render_fetch_work(
191 scope.clone(),
192 collection_id,
193 source.clone(),
194 format.clone(),
195 &work_stream,
196 );
197 let (rows_stream, decode_token) = render_decode_chunk(
199 scope.clone(),
200 format.clone(),
201 &records_stream,
202 shape.source_mfp,
203 );
204 let (batch_stream, batch_token) = render_stage_batches_operator(
206 scope.clone(),
207 collection_id,
208 &collection_meta,
209 persist_clients,
210 &rows_stream,
211 );
212
213 render_completion_operator(scope, &batch_stream, worker_callback);
215
216 let tokens = vec![
217 discover_token,
218 split_token,
219 fetch_token,
220 decode_token,
221 batch_token,
222 ];
223
224 tokens
225}
226
227pub fn render_discover_objects<G, S>(
230 scope: G,
231 collection_id: GlobalId,
232 source: S,
233 filter: ContentFilter,
234) -> (
235 TimelyStream<G, Result<(S::Object, S::Checksum), StorageErrorX>>,
236 PressOnDropButton,
237)
238where
239 G: Scope<Timestamp = Timestamp>,
240 S: OneshotSource + 'static,
241{
242 let worker_id = scope.index();
244 let num_workers = scope.peers();
245 let active_worker_id = usize::cast_from((collection_id, "discover").hashed()) % num_workers;
246 let is_active_worker = worker_id == active_worker_id;
247
248 let mut builder = AsyncOperatorBuilder::new("CopyFrom-discover".to_string(), scope.clone());
249
250 let (start_handle, start_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
251
252 let shutdown = builder.build(move |caps| async move {
253 let [start_cap] = caps.try_into().unwrap();
254
255 if !is_active_worker {
256 return;
257 }
258
259 let filter = match ObjectFilter::try_new(filter) {
260 Ok(filter) => filter,
261 Err(err) => {
262 tracing::warn!(?err, "failed to create filter");
263 start_handle.give(&start_cap, Err(StorageErrorXKind::generic(err).into()));
264 return;
265 }
266 };
267
268 let work = source.list().await.context("list");
269 match work {
270 Ok(objects) => {
271 let (include, exclude): (Vec<_>, Vec<_>) = objects
272 .into_iter()
273 .partition(|(o, _checksum)| filter.filter::<S>(o));
274 tracing::info!(%worker_id, ?include, ?exclude, "listed objects");
275
276 include
277 .into_iter()
278 .for_each(|object| start_handle.give(&start_cap, Ok(object)))
279 }
280 Err(err) => {
281 tracing::warn!(?err, "failed to list oneshot source");
282 start_handle.give(&start_cap, Err(err))
283 }
284 }
285 });
286
287 (start_stream, shutdown.press_on_drop())
288}
289
290pub fn render_split_work<G, S, F>(
293 scope: G,
294 collection_id: GlobalId,
295 objects: &TimelyStream<G, Result<(S::Object, S::Checksum), StorageErrorX>>,
296 source: S,
297 format: F,
298) -> (
299 TimelyStream<G, Result<F::WorkRequest<S>, StorageErrorX>>,
300 PressOnDropButton,
301)
302where
303 G: Scope,
304 S: OneshotSource + Send + Sync + 'static,
305 F: OneshotFormat + Send + Sync + 'static,
306{
307 let worker_id = scope.index();
308 let mut builder = AsyncOperatorBuilder::new("CopyFrom-split_work".to_string(), scope.clone());
309
310 let (request_handle, request_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
311 let mut objects_handle = builder.new_input_for(objects, Distribute, &request_handle);
312
313 let shutdown = builder.build(move |caps| async move {
314 let [_objects_cap] = caps.try_into().unwrap();
315
316 info!(%collection_id, %worker_id, "CopyFrom Split Work");
317
318 while let Some(event) = objects_handle.next().await {
319 let (capability, maybe_objects) = match event {
320 AsyncEvent::Data(cap, req) => (cap, req),
321 AsyncEvent::Progress(_) => continue,
322 };
323
324 let result = async {
327 let mut requests = Vec::new();
328
329 for maybe_object in maybe_objects {
330 let (object, checksum) = maybe_object?;
332
333 let format_ = format.clone();
334 let source_ = source.clone();
335 let work_requests = mz_ore::task::spawn(|| "split-work", async move {
336 info!(%worker_id, object = %object.name(), "splitting object");
337 format_.split_work(source_.clone(), object, checksum).await
338 })
339 .await?;
340
341 requests.extend(work_requests);
342 }
343
344 Ok::<_, StorageErrorX>(requests)
345 }
346 .await
347 .context("split");
348
349 match result {
350 Ok(requests) => requests
351 .into_iter()
352 .for_each(|req| request_handle.give(&capability, Ok(req))),
353 Err(err) => request_handle.give(&capability, Err(err)),
354 }
355 }
356 });
357
358 (request_stream, shutdown.press_on_drop())
359}
360
361pub fn render_fetch_work<G, S, F>(
365 scope: G,
366 collection_id: GlobalId,
367 source: S,
368 format: F,
369 work_requests: &TimelyStream<G, Result<F::WorkRequest<S>, StorageErrorX>>,
370) -> (
371 TimelyStream<G, Result<F::RecordChunk, StorageErrorX>>,
372 PressOnDropButton,
373)
374where
375 G: Scope,
376 S: OneshotSource + Sync + 'static,
377 F: OneshotFormat + Sync + 'static,
378{
379 let worker_id = scope.index();
380 let mut builder = AsyncOperatorBuilder::new("CopyFrom-fetch_work".to_string(), scope.clone());
381
382 let (record_handle, record_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
383 let mut work_requests_handle = builder.new_input_for(work_requests, Distribute, &record_handle);
384
385 let shutdown = builder.build(move |caps| async move {
386 let [_work_cap] = caps.try_into().unwrap();
387
388 info!(%collection_id, %worker_id, "CopyFrom Fetch Work");
389
390 while let Some(event) = work_requests_handle.next().await {
391 let (capability, maybe_requests) = match event {
392 AsyncEvent::Data(cap, req) => (cap, req),
393 AsyncEvent::Progress(_) => continue,
394 };
395
396 let result = async {
398 for maybe_request in maybe_requests {
400 let request = maybe_request?;
401
402 let mut work_stream = format.fetch_work(&source, request);
403 while let Some(result) = work_stream.next().await {
404 let record_chunk = result.context("fetch worker")?;
406
407 record_handle.give(&capability, Ok(record_chunk));
412 }
413 }
414
415 Ok::<_, StorageErrorX>(())
416 }
417 .await
418 .context("fetch work");
419
420 if let Err(err) = result {
421 tracing::warn!(?err, "failed to fetch");
422 record_handle.give(&capability, Err(err))
423 }
424 }
425 });
426
427 (record_stream, shutdown.press_on_drop())
428}
429
430pub fn render_decode_chunk<G, F>(
433 scope: G,
434 format: F,
435 record_chunks: &TimelyStream<G, Result<F::RecordChunk, StorageErrorX>>,
436 mfp: SafeMfpPlan,
437) -> (
438 TimelyStream<G, Result<Row, StorageErrorX>>,
439 PressOnDropButton,
440)
441where
442 G: Scope,
443 F: OneshotFormat + 'static,
444{
445 let mut builder = AsyncOperatorBuilder::new("CopyFrom-decode_chunk".to_string(), scope.clone());
446
447 let (row_handle, row_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
448 let mut record_chunk_handle = builder.new_input_for(record_chunks, Distribute, &row_handle);
449
450 let shutdown = builder.build(move |caps| async move {
451 let [_row_cap] = caps.try_into().unwrap();
452
453 let mut datum_vec = DatumVec::default();
454 let row_arena = RowArena::default();
455 let mut row_buf = Row::default();
456
457 while let Some(event) = record_chunk_handle.next().await {
458 let (capability, maybe_chunks) = match event {
459 AsyncEvent::Data(cap, data) => (cap, data),
460 AsyncEvent::Progress(_) => continue,
461 };
462
463 let result = async {
464 let mut rows = Vec::new();
465 for maybe_chunk in maybe_chunks {
466 let chunk = maybe_chunk?;
467 format.decode_chunk(chunk, &mut rows)?;
468 }
469 Ok::<_, StorageErrorX>(rows)
470 }
471 .await
472 .context("decode chunk");
473
474 match result {
475 Ok(rows) => {
476 for row in rows {
479 let mut datums = datum_vec.borrow_with(&row);
480 let result = mfp
481 .evaluate_into(&mut *datums, &row_arena, &mut row_buf)
482 .map(|row| row.cloned());
483
484 match result {
485 Ok(Some(row)) => row_handle.give(&capability, Ok(row)),
486 Ok(None) => {
487 mz_ore::soft_panic_or_log!("oneshot source MFP filtered out data!");
490 }
491 Err(e) => {
492 let err = StorageErrorXKind::MfpEvalError(e.to_string().into())
493 .with_context("decode");
494 row_handle.give(&capability, Err(err))
495 }
496 }
497 }
498 }
499 Err(err) => row_handle.give(&capability, Err(err)),
500 }
501 }
502 });
503
504 (row_stream, shutdown.press_on_drop())
505}
506
507pub fn render_stage_batches_operator<G>(
510 scope: G,
511 collection_id: GlobalId,
512 collection_meta: &CollectionMetadata,
513 persist_clients: Arc<PersistClientCache>,
514 rows_stream: &TimelyStream<G, Result<Row, StorageErrorX>>,
515) -> (
516 TimelyStream<G, Result<ProtoBatch, StorageErrorX>>,
517 PressOnDropButton,
518)
519where
520 G: Scope,
521{
522 let persist_location = collection_meta.persist_location.clone();
523 let shard_id = collection_meta.data_shard;
524 let collection_desc = collection_meta.relation_desc.clone();
525
526 let mut builder =
527 AsyncOperatorBuilder::new("CopyFrom-stage_batches".to_string(), scope.clone());
528
529 let (proto_batch_handle, proto_batch_stream) =
530 builder.new_output::<CapacityContainerBuilder<_>>();
531 let mut rows_handle = builder.new_input_for(rows_stream, Pipeline, &proto_batch_handle);
532
533 let shutdown = builder.build(move |caps| async move {
534 let [proto_batch_cap] = caps.try_into().unwrap();
535
536 let persist_client = persist_clients
538 .open(persist_location)
539 .await
540 .expect("failed to open Persist client");
541 let persist_diagnostics = Diagnostics {
542 shard_name: collection_id.to_string(),
543 handle_purpose: "CopyFrom::stage_batches".to_string(),
544 };
545 let write_handle = persist_client
546 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
547 shard_id,
548 Arc::new(collection_desc),
549 Arc::new(UnitSchema),
550 persist_diagnostics,
551 )
552 .await
553 .expect("could not open Persist shard");
554
555 let lower = mz_repr::Timestamp::MIN;
559 let upper = Antichain::from_elem(lower.step_forward());
560
561 let mut batch_builder = write_handle.builder(Antichain::from_elem(lower));
562
563 while let Some(event) = rows_handle.next().await {
564 let AsyncEvent::Data(_, row_batch) = event else {
565 continue;
566 };
567
568 for maybe_row in row_batch {
570 match maybe_row {
571 Ok(row) => {
573 let data = SourceData(Ok(row));
574 batch_builder
575 .add(&data, &(), &lower, &1)
576 .await
577 .expect("failed to add Row to batch");
578 }
579 Err(err) => {
581 let batch = batch_builder
583 .finish(upper)
584 .await
585 .expect("failed to cleanup batch");
586 batch.delete().await;
587
588 proto_batch_handle
590 .give(&proto_batch_cap, Err(err).context("stage batches"));
591 return;
592 }
593 }
594 }
595 }
596
597 let batch = batch_builder
598 .finish(upper)
599 .await
600 .expect("failed to create Batch");
601
602 let proto_batch = batch.into_transmittable_batch();
611 proto_batch_handle.give(&proto_batch_cap, Ok(proto_batch));
612 });
613
614 (proto_batch_stream, shutdown.press_on_drop())
615}
616
617pub fn render_completion_operator<G, F>(
620 scope: G,
621 results_stream: &TimelyStream<G, Result<ProtoBatch, StorageErrorX>>,
622 worker_callback: F,
623) where
624 G: Scope,
625 F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
626{
627 let mut builder = AsyncOperatorBuilder::new("CopyFrom-completion".to_string(), scope.clone());
628 let mut results_input = builder.new_disconnected_input(results_stream, Pipeline);
629
630 builder.build(move |_| async move {
631 let result = async move {
632 let mut maybe_payload: Option<ProtoBatch> = None;
633
634 while let Some(event) = results_input.next().await {
635 if let AsyncEvent::Data(_cap, results) = event {
636 let [result] = results
637 .try_into()
638 .expect("only 1 event on the result stream");
639
640 if maybe_payload.is_some() {
642 panic!("expected only one batch!");
643 }
644
645 maybe_payload = Some(result.map_err(|e| e.to_string())?);
646 }
647 }
648
649 Ok(maybe_payload)
650 }
651 .await;
652
653 worker_callback(result);
655 });
656}
657
658pub trait OneshotObject {
660 fn name(&self) -> &str;
662
663 fn path(&self) -> &str;
665
666 fn size(&self) -> usize;
668
669 fn encodings(&self) -> &[Encoding];
675}
676
677#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
679pub enum Encoding {
680 Bzip2,
681 Gzip,
682 Xz,
683 Zstd,
684}
685
686pub trait OneshotSource: Clone + Send + Unpin {
688 type Object: OneshotObject
690 + Debug
691 + Clone
692 + Send
693 + Unpin
694 + Serialize
695 + DeserializeOwned
696 + 'static;
697 type Checksum: Debug + Clone + Send + Unpin + Serialize + DeserializeOwned + 'static;
699
700 fn list<'a>(
702 &'a self,
703 ) -> impl Future<Output = Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX>> + Send;
704
705 fn get<'s>(
707 &'s self,
708 object: Self::Object,
709 checksum: Self::Checksum,
710 range: Option<std::ops::RangeInclusive<usize>>,
711 ) -> BoxStream<'s, Result<Bytes, StorageErrorX>>;
712}
713
714#[derive(Clone, Debug)]
721pub(crate) enum SourceKind {
722 Http(HttpOneshotSource),
723 AwsS3(AwsS3Source),
724}
725
726impl OneshotSource for SourceKind {
727 type Object = ObjectKind;
728 type Checksum = ChecksumKind;
729
730 async fn list<'a>(&'a self) -> Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX> {
731 match self {
732 SourceKind::Http(http) => {
733 let objects = http.list().await.context("http")?;
734 let objects = objects
735 .into_iter()
736 .map(|(object, checksum)| {
737 (ObjectKind::Http(object), ChecksumKind::Http(checksum))
738 })
739 .collect();
740 Ok(objects)
741 }
742 SourceKind::AwsS3(s3) => {
743 let objects = s3.list().await.context("s3")?;
744 let objects = objects
745 .into_iter()
746 .map(|(object, checksum)| {
747 (ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum))
748 })
749 .collect();
750 Ok(objects)
751 }
752 }
753 }
754
755 fn get<'s>(
756 &'s self,
757 object: Self::Object,
758 checksum: Self::Checksum,
759 range: Option<std::ops::RangeInclusive<usize>>,
760 ) -> BoxStream<'s, Result<Bytes, StorageErrorX>> {
761 match (self, object, checksum) {
762 (SourceKind::Http(http), ObjectKind::Http(object), ChecksumKind::Http(checksum)) => {
763 http.get(object, checksum, range)
764 .map(|result| result.context("http"))
765 .boxed()
766 }
767 (SourceKind::AwsS3(s3), ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum)) => s3
768 .get(object, checksum, range)
769 .map(|result| result.context("aws_s3"))
770 .boxed(),
771 (SourceKind::AwsS3(_) | SourceKind::Http(_), _, _) => {
772 unreachable!("programming error! wrong source, object, and checksum kind");
773 }
774 }
775 }
776}
777
778#[derive(Debug, Clone, Serialize, Deserialize)]
780pub(crate) enum ObjectKind {
781 Http(HttpObject),
782 AwsS3(S3Object),
783}
784
785impl OneshotObject for ObjectKind {
786 fn name(&self) -> &str {
787 match self {
788 ObjectKind::Http(object) => object.name(),
789 ObjectKind::AwsS3(object) => object.name(),
790 }
791 }
792
793 fn path(&self) -> &str {
794 match self {
795 ObjectKind::Http(object) => object.path(),
796 ObjectKind::AwsS3(object) => object.path(),
797 }
798 }
799
800 fn size(&self) -> usize {
801 match self {
802 ObjectKind::Http(object) => object.size(),
803 ObjectKind::AwsS3(object) => object.size(),
804 }
805 }
806
807 fn encodings(&self) -> &[Encoding] {
808 match self {
809 ObjectKind::Http(object) => object.encodings(),
810 ObjectKind::AwsS3(object) => object.encodings(),
811 }
812 }
813}
814
815#[derive(Debug, Clone, Serialize, Deserialize)]
817pub(crate) enum ChecksumKind {
818 Http(HttpChecksum),
819 AwsS3(S3Checksum),
820}
821
822pub trait OneshotFormat: Clone {
824 type WorkRequest<S>: Debug + Clone + Send + Serialize + DeserializeOwned + 'static
826 where
827 S: OneshotSource;
828 type RecordChunk: Debug + Clone + Send + Serialize + DeserializeOwned + 'static;
830
831 fn split_work<S: OneshotSource + Send>(
836 &self,
837 source: S,
838 object: S::Object,
839 checksum: S::Checksum,
840 ) -> impl Future<Output = Result<Vec<Self::WorkRequest<S>>, StorageErrorX>> + Send;
841
842 fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
845 &'a self,
846 source: &'a S,
847 request: Self::WorkRequest<S>,
848 ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>>;
849
850 fn decode_chunk(
852 &self,
853 chunk: Self::RecordChunk,
854 rows: &mut Vec<Row>,
855 ) -> Result<usize, StorageErrorX>;
856}
857
858#[derive(Clone, Debug)]
865pub(crate) enum FormatKind {
866 Csv(CsvDecoder),
867 Parquet(ParquetFormat),
868}
869
870impl OneshotFormat for FormatKind {
871 type WorkRequest<S>
872 = RequestKind<S::Object, S::Checksum>
873 where
874 S: OneshotSource;
875 type RecordChunk = RecordChunkKind;
876
877 async fn split_work<S: OneshotSource + Send>(
878 &self,
879 source: S,
880 object: S::Object,
881 checksum: S::Checksum,
882 ) -> Result<Vec<Self::WorkRequest<S>>, StorageErrorX> {
883 match self {
884 FormatKind::Csv(csv) => {
885 let work = csv
886 .split_work(source, object, checksum)
887 .await
888 .context("csv")?
889 .into_iter()
890 .map(RequestKind::Csv)
891 .collect();
892 Ok(work)
893 }
894 FormatKind::Parquet(parquet) => {
895 let work = parquet
896 .split_work(source, object, checksum)
897 .await
898 .context("parquet")?
899 .into_iter()
900 .map(RequestKind::Parquet)
901 .collect();
902 Ok(work)
903 }
904 }
905 }
906
907 fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
908 &'a self,
909 source: &'a S,
910 request: Self::WorkRequest<S>,
911 ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>> {
912 match (self, request) {
913 (FormatKind::Csv(csv), RequestKind::Csv(request)) => csv
914 .fetch_work(source, request)
915 .map_ok(RecordChunkKind::Csv)
916 .map(|result| result.context("csv"))
917 .boxed(),
918 (FormatKind::Parquet(parquet), RequestKind::Parquet(request)) => parquet
919 .fetch_work(source, request)
920 .map_ok(RecordChunkKind::Parquet)
921 .map(|result| result.context("parquet"))
922 .boxed(),
923 (FormatKind::Parquet(_), RequestKind::Csv(_))
924 | (FormatKind::Csv(_), RequestKind::Parquet(_)) => {
925 unreachable!("programming error, {self:?}")
926 }
927 }
928 }
929
930 fn decode_chunk(
931 &self,
932 chunk: Self::RecordChunk,
933 rows: &mut Vec<Row>,
934 ) -> Result<usize, StorageErrorX> {
935 match (self, chunk) {
936 (FormatKind::Csv(csv), RecordChunkKind::Csv(chunk)) => {
937 csv.decode_chunk(chunk, rows).context("csv")
938 }
939 (FormatKind::Parquet(parquet), RecordChunkKind::Parquet(chunk)) => {
940 parquet.decode_chunk(chunk, rows).context("parquet")
941 }
942 (FormatKind::Parquet(_), RecordChunkKind::Csv(_))
943 | (FormatKind::Csv(_), RecordChunkKind::Parquet(_)) => {
944 unreachable!("programming error, {self:?}")
945 }
946 }
947 }
948}
949
950#[derive(Clone, Debug, Serialize, Deserialize)]
951pub(crate) enum RequestKind<O, C> {
952 Csv(CsvWorkRequest<O, C>),
953 Parquet(ParquetWorkRequest<O, C>),
954}
955
956#[derive(Clone, Debug, Serialize, Deserialize)]
957pub(crate) enum RecordChunkKind {
958 Csv(CsvRecord),
959 Parquet(ParquetRowGroup),
960}
961
962pub(crate) enum ObjectFilter {
963 None,
964 Files(BTreeSet<Box<str>>),
965 Pattern(glob::Pattern),
966}
967
968impl ObjectFilter {
969 pub fn try_new(filter: ContentFilter) -> Result<Self, anyhow::Error> {
970 match filter {
971 ContentFilter::None => Ok(ObjectFilter::None),
972 ContentFilter::Files(files) => {
973 let files = files.into_iter().map(|f| f.into()).collect();
974 Ok(ObjectFilter::Files(files))
975 }
976 ContentFilter::Pattern(pattern) => {
977 let pattern = glob::Pattern::new(&pattern)?;
978 Ok(ObjectFilter::Pattern(pattern))
979 }
980 }
981 }
982
983 pub fn filter<S: OneshotSource>(&self, object: &S::Object) -> bool {
985 match self {
986 ObjectFilter::None => true,
987 ObjectFilter::Files(files) => files.contains(object.path()),
988 ObjectFilter::Pattern(pattern) => pattern.matches(object.path()),
989 }
990 }
991}
992
993#[derive(Debug, Clone, Deserialize, Serialize)]
1000pub struct StorageErrorX {
1001 kind: StorageErrorXKind,
1002 context: LinkedList<String>,
1003}
1004
1005impl fmt::Display for StorageErrorX {
1006 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1007 writeln!(f, "error: {}", self.kind)?;
1008 writeln!(f, "causes: {:?}", self.context)?;
1009 Ok(())
1010 }
1011}
1012
1013#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
1015pub enum StorageErrorXKind {
1016 #[error("csv decoding error: {0}")]
1017 CsvDecoding(Arc<str>),
1018 #[error("parquet error: {0}")]
1019 ParquetError(Arc<str>),
1020 #[error("reqwest error: {0}")]
1021 Reqwest(Arc<str>),
1022 #[error("aws s3 request error: {0}")]
1023 AwsS3Request(String),
1024 #[error("aws s3 bytestream error: {0}")]
1025 AwsS3Bytes(Arc<str>),
1026 #[error("invalid reqwest header: {0}")]
1027 InvalidHeader(Arc<str>),
1028 #[error("failed to decode Row from a record batch: {0}")]
1029 InvalidRecordBatch(Arc<str>),
1030 #[error("programming error: {0}")]
1031 ProgrammingError(Arc<str>),
1032 #[error("failed to get the size of an object")]
1033 MissingSize,
1034 #[error("object is missing the required '{0}' field")]
1035 MissingField(Arc<str>),
1036 #[error("failed while evaluating the provided mfp: '{0}'")]
1037 MfpEvalError(Arc<str>),
1038 #[error("something went wrong: {0}")]
1039 Generic(String),
1040}
1041
1042impl From<csv_async::Error> for StorageErrorXKind {
1043 fn from(err: csv_async::Error) -> Self {
1044 StorageErrorXKind::CsvDecoding(err.to_string().into())
1045 }
1046}
1047
1048impl From<reqwest::Error> for StorageErrorXKind {
1049 fn from(err: reqwest::Error) -> Self {
1050 StorageErrorXKind::Reqwest(err.to_string().into())
1051 }
1052}
1053
1054impl From<reqwest::header::ToStrError> for StorageErrorXKind {
1055 fn from(err: reqwest::header::ToStrError) -> Self {
1056 StorageErrorXKind::InvalidHeader(err.to_string().into())
1057 }
1058}
1059
1060impl From<aws_smithy_types::byte_stream::error::Error> for StorageErrorXKind {
1061 fn from(err: aws_smithy_types::byte_stream::error::Error) -> Self {
1062 StorageErrorXKind::AwsS3Request(err.to_string())
1063 }
1064}
1065
1066impl From<::parquet::errors::ParquetError> for StorageErrorXKind {
1067 fn from(err: ::parquet::errors::ParquetError) -> Self {
1068 StorageErrorXKind::ParquetError(err.to_string().into())
1069 }
1070}
1071
1072impl StorageErrorXKind {
1073 pub fn with_context<C: Display>(self, context: C) -> StorageErrorX {
1074 StorageErrorX {
1075 kind: self,
1076 context: LinkedList::from([context.to_string()]),
1077 }
1078 }
1079
1080 pub fn invalid_record_batch<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1081 StorageErrorXKind::InvalidRecordBatch(error.into())
1082 }
1083
1084 pub fn generic<C: Display>(error: C) -> StorageErrorXKind {
1085 StorageErrorXKind::Generic(error.to_string())
1086 }
1087
1088 pub fn programming_error<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1089 StorageErrorXKind::ProgrammingError(error.into())
1090 }
1091}
1092
1093impl<E> From<E> for StorageErrorX
1094where
1095 E: Into<StorageErrorXKind>,
1096{
1097 fn from(err: E) -> Self {
1098 StorageErrorX {
1099 kind: err.into(),
1100 context: LinkedList::new(),
1101 }
1102 }
1103}
1104
1105trait StorageErrorXContext<T> {
1106 fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1107 where
1108 C: Display;
1109}
1110
1111impl<T, E> StorageErrorXContext<T> for Result<T, E>
1112where
1113 E: Into<StorageErrorXKind>,
1114{
1115 fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1116 where
1117 C: Display,
1118 {
1119 match self {
1120 Ok(val) => Ok(val),
1121 Err(kind) => Err(StorageErrorX {
1122 kind: kind.into(),
1123 context: LinkedList::from([context.to_string()]),
1124 }),
1125 }
1126 }
1127}
1128
1129impl<T> StorageErrorXContext<T> for Result<T, StorageErrorX> {
1130 fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1131 where
1132 C: Display,
1133 {
1134 match self {
1135 Ok(val) => Ok(val),
1136 Err(mut e) => {
1137 e.context.push_back(context.to_string());
1138 Err(e)
1139 }
1140 }
1141 }
1142}