1use std::fmt;
66use std::sync::Arc;
67
68use bytes::Bytes;
69use differential_dataflow::Hashable;
70use futures::stream::BoxStream;
71use futures::{StreamExt, TryStreamExt};
72use mz_expr::SafeMfpPlan;
73use mz_ore::cast::CastFrom;
74use mz_persist_client::Diagnostics;
75use mz_persist_client::batch::ProtoBatch;
76use mz_persist_client::cache::PersistClientCache;
77use mz_persist_types::Codec;
78use mz_persist_types::codec_impls::UnitSchema;
79use mz_repr::{DatumVec, GlobalId, Row, RowArena, Timestamp};
80use mz_storage_types::StorageDiff;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::controller::CollectionMetadata;
83use mz_storage_types::oneshot_sources::{
84 ContentFilter, ContentFormat, ContentSource, OneshotIngestionRequest,
85};
86use mz_storage_types::sources::SourceData;
87use mz_timely_util::builder_async::{
88 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
89};
90use mz_timely_util::pact::Distribute;
91use serde::de::DeserializeOwned;
92use serde::{Deserialize, Serialize};
93use std::collections::{BTreeSet, LinkedList};
94use std::fmt::{Debug, Display};
95use std::future::Future;
96use timely::container::CapacityContainerBuilder;
97use timely::dataflow::channels::pact::Pipeline;
98use timely::dataflow::{Scope, Stream as TimelyStream};
99use timely::progress::Antichain;
100use tracing::info;
101
102use crate::oneshot_source::aws_source::{AwsS3Source, S3Checksum, S3Object};
103use crate::oneshot_source::csv::{CsvDecoder, CsvRecord, CsvWorkRequest};
104use crate::oneshot_source::http_source::{HttpChecksum, HttpObject, HttpOneshotSource};
105use crate::oneshot_source::parquet::{ParquetFormat, ParquetRowGroup, ParquetWorkRequest};
106
107pub mod csv;
108pub mod parquet;
109
110pub mod aws_source;
111pub mod http_source;
112
113mod util;
114
115pub fn render<G, F>(
133 scope: G,
134 persist_clients: Arc<PersistClientCache>,
135 connection_context: ConnectionContext,
136 collection_id: GlobalId,
137 collection_meta: CollectionMetadata,
138 request: OneshotIngestionRequest,
139 worker_callback: F,
140) -> Vec<PressOnDropButton>
141where
142 G: Scope<Timestamp = Timestamp>,
143 F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
144{
145 let OneshotIngestionRequest {
146 source,
147 format,
148 filter,
149 shape,
150 } = request;
151
152 let source = match source {
153 ContentSource::Http { url } => {
154 let source = HttpOneshotSource::new(reqwest::Client::default(), url);
155 SourceKind::Http(source)
156 }
157 ContentSource::AwsS3 {
158 connection,
159 connection_id,
160 uri,
161 } => {
162 let source = AwsS3Source::new(connection, connection_id, connection_context, uri);
163 SourceKind::AwsS3(source)
164 }
165 };
166 tracing::info!(?source, "created oneshot source");
167
168 let format = match format {
169 ContentFormat::Csv(params) => {
170 let format = CsvDecoder::new(params, &shape.source_desc);
171 FormatKind::Csv(format)
172 }
173 ContentFormat::Parquet => {
174 let format = ParquetFormat::new(shape.source_desc);
175 FormatKind::Parquet(format)
176 }
177 };
178
179 let (objects_stream, discover_token) =
181 render_discover_objects(scope.clone(), collection_id, source.clone(), filter);
182 let (work_stream, split_token) = render_split_work(
184 scope.clone(),
185 collection_id,
186 &objects_stream,
187 source.clone(),
188 format.clone(),
189 );
190 let (records_stream, fetch_token) = render_fetch_work(
192 scope.clone(),
193 collection_id,
194 source.clone(),
195 format.clone(),
196 &work_stream,
197 );
198 let (rows_stream, decode_token) = render_decode_chunk(
200 scope.clone(),
201 format.clone(),
202 &records_stream,
203 shape.source_mfp,
204 );
205 let (batch_stream, batch_token) = render_stage_batches_operator(
207 scope.clone(),
208 collection_id,
209 &collection_meta,
210 persist_clients,
211 &rows_stream,
212 );
213
214 render_completion_operator(scope, &batch_stream, worker_callback);
216
217 let tokens = vec![
218 discover_token,
219 split_token,
220 fetch_token,
221 decode_token,
222 batch_token,
223 ];
224
225 tokens
226}
227
228pub fn render_discover_objects<G, S>(
231 scope: G,
232 collection_id: GlobalId,
233 source: S,
234 filter: ContentFilter,
235) -> (
236 TimelyStream<G, Result<(S::Object, S::Checksum), StorageErrorX>>,
237 PressOnDropButton,
238)
239where
240 G: Scope<Timestamp = Timestamp>,
241 S: OneshotSource + 'static,
242{
243 let worker_id = scope.index();
245 let num_workers = scope.peers();
246 let active_worker_id = usize::cast_from((collection_id, "discover").hashed()) % num_workers;
247 let is_active_worker = worker_id == active_worker_id;
248
249 let mut builder = AsyncOperatorBuilder::new("CopyFrom-discover".to_string(), scope.clone());
250
251 let (start_handle, start_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
252
253 let shutdown = builder.build(move |caps| async move {
254 let [start_cap] = caps.try_into().unwrap();
255
256 if !is_active_worker {
257 return;
258 }
259
260 let filter = match ObjectFilter::try_new(filter) {
261 Ok(filter) => filter,
262 Err(err) => {
263 tracing::warn!(?err, "failed to create filter");
264 start_handle.give(&start_cap, Err(StorageErrorXKind::generic(err).into()));
265 return;
266 }
267 };
268
269 let work = source.list().await.context("list");
270 match work {
271 Ok(objects) => {
272 let (include, exclude): (Vec<_>, Vec<_>) = objects
273 .into_iter()
274 .partition(|(o, _checksum)| filter.filter::<S>(o));
275 tracing::info!(%worker_id, ?include, ?exclude, "listed objects");
276
277 include
278 .into_iter()
279 .for_each(|object| start_handle.give(&start_cap, Ok(object)))
280 }
281 Err(err) => {
282 tracing::warn!(?err, "failed to list oneshot source");
283 start_handle.give(&start_cap, Err(err))
284 }
285 }
286 });
287
288 (start_stream, shutdown.press_on_drop())
289}
290
291pub fn render_split_work<G, S, F>(
294 scope: G,
295 collection_id: GlobalId,
296 objects: &TimelyStream<G, Result<(S::Object, S::Checksum), StorageErrorX>>,
297 source: S,
298 format: F,
299) -> (
300 TimelyStream<G, Result<F::WorkRequest<S>, StorageErrorX>>,
301 PressOnDropButton,
302)
303where
304 G: Scope,
305 S: OneshotSource + Send + Sync + 'static,
306 F: OneshotFormat + Send + Sync + 'static,
307{
308 let worker_id = scope.index();
309 let mut builder = AsyncOperatorBuilder::new("CopyFrom-split_work".to_string(), scope.clone());
310
311 let (request_handle, request_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
312 let mut objects_handle = builder.new_input_for(objects, Distribute, &request_handle);
313
314 let shutdown = builder.build(move |caps| async move {
315 let [_objects_cap] = caps.try_into().unwrap();
316
317 info!(%collection_id, %worker_id, "CopyFrom Split Work");
318
319 while let Some(event) = objects_handle.next().await {
320 let (capability, maybe_objects) = match event {
321 AsyncEvent::Data(cap, req) => (cap, req),
322 AsyncEvent::Progress(_) => continue,
323 };
324
325 let result = async {
328 let mut requests = Vec::new();
329
330 for maybe_object in maybe_objects {
331 let (object, checksum) = maybe_object?;
333
334 let format_ = format.clone();
335 let source_ = source.clone();
336 let work_requests = mz_ore::task::spawn(|| "split-work", async move {
337 info!(%worker_id, object = %object.name(), "splitting object");
338 format_.split_work(source_.clone(), object, checksum).await
339 })
340 .await?;
341
342 requests.extend(work_requests);
343 }
344
345 Ok::<_, StorageErrorX>(requests)
346 }
347 .await
348 .context("split");
349
350 match result {
351 Ok(requests) => requests
352 .into_iter()
353 .for_each(|req| request_handle.give(&capability, Ok(req))),
354 Err(err) => request_handle.give(&capability, Err(err)),
355 }
356 }
357 });
358
359 (request_stream, shutdown.press_on_drop())
360}
361
362pub fn render_fetch_work<G, S, F>(
366 scope: G,
367 collection_id: GlobalId,
368 source: S,
369 format: F,
370 work_requests: &TimelyStream<G, Result<F::WorkRequest<S>, StorageErrorX>>,
371) -> (
372 TimelyStream<G, Result<F::RecordChunk, StorageErrorX>>,
373 PressOnDropButton,
374)
375where
376 G: Scope,
377 S: OneshotSource + Sync + 'static,
378 F: OneshotFormat + Sync + 'static,
379{
380 let worker_id = scope.index();
381 let mut builder = AsyncOperatorBuilder::new("CopyFrom-fetch_work".to_string(), scope.clone());
382
383 let (record_handle, record_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
384 let mut work_requests_handle = builder.new_input_for(work_requests, Distribute, &record_handle);
385
386 let shutdown = builder.build(move |caps| async move {
387 let [_work_cap] = caps.try_into().unwrap();
388
389 info!(%collection_id, %worker_id, "CopyFrom Fetch Work");
390
391 while let Some(event) = work_requests_handle.next().await {
392 let (capability, maybe_requests) = match event {
393 AsyncEvent::Data(cap, req) => (cap, req),
394 AsyncEvent::Progress(_) => continue,
395 };
396
397 let result = async {
399 for maybe_request in maybe_requests {
401 let request = maybe_request?;
402
403 let mut work_stream = format.fetch_work(&source, request);
404 while let Some(result) = work_stream.next().await {
405 let record_chunk = result.context("fetch worker")?;
407
408 record_handle.give(&capability, Ok(record_chunk));
413 }
414 }
415
416 Ok::<_, StorageErrorX>(())
417 }
418 .await
419 .context("fetch work");
420
421 if let Err(err) = result {
422 tracing::warn!(?err, "failed to fetch");
423 record_handle.give(&capability, Err(err))
424 }
425 }
426 });
427
428 (record_stream, shutdown.press_on_drop())
429}
430
431pub fn render_decode_chunk<G, F>(
434 scope: G,
435 format: F,
436 record_chunks: &TimelyStream<G, Result<F::RecordChunk, StorageErrorX>>,
437 mfp: SafeMfpPlan,
438) -> (
439 TimelyStream<G, Result<Row, StorageErrorX>>,
440 PressOnDropButton,
441)
442where
443 G: Scope,
444 F: OneshotFormat + 'static,
445{
446 let mut builder = AsyncOperatorBuilder::new("CopyFrom-decode_chunk".to_string(), scope.clone());
447
448 let (row_handle, row_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
449 let mut record_chunk_handle = builder.new_input_for(record_chunks, Distribute, &row_handle);
450
451 let shutdown = builder.build(move |caps| async move {
452 let [_row_cap] = caps.try_into().unwrap();
453
454 let mut datum_vec = DatumVec::default();
455 let row_arena = RowArena::default();
456 let mut row_buf = Row::default();
457
458 while let Some(event) = record_chunk_handle.next().await {
459 let (capability, maybe_chunks) = match event {
460 AsyncEvent::Data(cap, data) => (cap, data),
461 AsyncEvent::Progress(_) => continue,
462 };
463
464 let result = async {
465 let mut rows = Vec::new();
466 for maybe_chunk in maybe_chunks {
467 let chunk = maybe_chunk?;
468 format.decode_chunk(chunk, &mut rows)?;
469 }
470 Ok::<_, StorageErrorX>(rows)
471 }
472 .await
473 .context("decode chunk");
474
475 match result {
476 Ok(rows) => {
477 for row in rows {
480 let mut datums = datum_vec.borrow_with(&row);
481 let result = mfp
482 .evaluate_into(&mut *datums, &row_arena, &mut row_buf)
483 .map(|row| row.cloned());
484
485 match result {
486 Ok(Some(row)) => row_handle.give(&capability, Ok(row)),
487 Ok(None) => {
488 mz_ore::soft_panic_or_log!("oneshot source MFP filtered out data!");
491 }
492 Err(e) => {
493 let err = StorageErrorXKind::MfpEvalError(e.to_string().into())
494 .with_context("decode");
495 row_handle.give(&capability, Err(err))
496 }
497 }
498 }
499 }
500 Err(err) => row_handle.give(&capability, Err(err)),
501 }
502 }
503 });
504
505 (row_stream, shutdown.press_on_drop())
506}
507
508pub fn render_stage_batches_operator<G>(
511 scope: G,
512 collection_id: GlobalId,
513 collection_meta: &CollectionMetadata,
514 persist_clients: Arc<PersistClientCache>,
515 rows_stream: &TimelyStream<G, Result<Row, StorageErrorX>>,
516) -> (
517 TimelyStream<G, Result<ProtoBatch, StorageErrorX>>,
518 PressOnDropButton,
519)
520where
521 G: Scope,
522{
523 let persist_location = collection_meta.persist_location.clone();
524 let shard_id = collection_meta.data_shard;
525 let collection_desc = Arc::new(collection_meta.relation_desc.clone());
526
527 let mut builder =
528 AsyncOperatorBuilder::new("CopyFrom-stage_batches".to_string(), scope.clone());
529
530 let (proto_batch_handle, proto_batch_stream) =
531 builder.new_output::<CapacityContainerBuilder<_>>();
532 let mut rows_handle = builder.new_input_for(rows_stream, Pipeline, &proto_batch_handle);
533
534 let shutdown = builder.build(move |caps| async move {
535 let [proto_batch_cap] = caps.try_into().unwrap();
536
537 let persist_client = persist_clients
539 .open(persist_location)
540 .await
541 .expect("failed to open Persist client");
542 let persist_diagnostics = Diagnostics {
543 shard_name: collection_id.to_string(),
544 handle_purpose: "CopyFrom::stage_batches".to_string(),
545 };
546 let write_handle = persist_client
547 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
548 shard_id,
549 Arc::clone(&collection_desc),
550 Arc::new(UnitSchema),
551 persist_diagnostics,
552 )
553 .await
554 .expect("could not open Persist shard");
555
556 let lower = mz_repr::Timestamp::MIN;
560 let upper = Antichain::from_elem(lower.step_forward());
561
562 let mut batch_builder = write_handle.builder(Antichain::from_elem(lower));
563
564 while let Some(event) = rows_handle.next().await {
565 let AsyncEvent::Data(_, row_batch) = event else {
566 continue;
567 };
568
569 for maybe_row in row_batch {
571 let maybe_row = maybe_row.and_then(|row| {
572 Row::validate(&row, &*collection_desc).map_err(|e| {
573 StorageErrorXKind::invalid_record_batch(e).with_context("stage_batches")
574 })?;
575 Ok(row)
576 });
577 match maybe_row {
578 Ok(row) => {
580 let data = SourceData(Ok(row));
581 batch_builder
582 .add(&data, &(), &lower, &1)
583 .await
584 .expect("failed to add Row to batch");
585 }
586 Err(err) => {
588 let batch = batch_builder
590 .finish(upper)
591 .await
592 .expect("failed to cleanup batch");
593 batch.delete().await;
594
595 proto_batch_handle
597 .give(&proto_batch_cap, Err(err).context("stage batches"));
598 return;
599 }
600 }
601 }
602 }
603
604 let batch = batch_builder
605 .finish(upper)
606 .await
607 .expect("failed to create Batch");
608
609 let proto_batch = batch.into_transmittable_batch();
618 proto_batch_handle.give(&proto_batch_cap, Ok(proto_batch));
619 });
620
621 (proto_batch_stream, shutdown.press_on_drop())
622}
623
624pub fn render_completion_operator<G, F>(
627 scope: G,
628 results_stream: &TimelyStream<G, Result<ProtoBatch, StorageErrorX>>,
629 worker_callback: F,
630) where
631 G: Scope,
632 F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
633{
634 let mut builder = AsyncOperatorBuilder::new("CopyFrom-completion".to_string(), scope.clone());
635 let mut results_input = builder.new_disconnected_input(results_stream, Pipeline);
636
637 builder.build(move |_| async move {
638 let result = async move {
639 let mut maybe_payload: Option<ProtoBatch> = None;
640
641 while let Some(event) = results_input.next().await {
642 if let AsyncEvent::Data(_cap, results) = event {
643 let [result] = results
644 .try_into()
645 .expect("only 1 event on the result stream");
646
647 if maybe_payload.is_some() {
649 panic!("expected only one batch!");
650 }
651
652 maybe_payload = Some(result.map_err(|e| e.to_string())?);
653 }
654 }
655
656 Ok(maybe_payload)
657 }
658 .await;
659
660 worker_callback(result);
662 });
663}
664
665pub trait OneshotObject {
667 fn name(&self) -> &str;
669
670 fn path(&self) -> &str;
672
673 fn size(&self) -> usize;
675
676 fn encodings(&self) -> &[Encoding];
682}
683
684#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
686pub enum Encoding {
687 Bzip2,
688 Gzip,
689 Xz,
690 Zstd,
691}
692
693pub trait OneshotSource: Clone + Send + Unpin {
695 type Object: OneshotObject
697 + Debug
698 + Clone
699 + Send
700 + Unpin
701 + Serialize
702 + DeserializeOwned
703 + 'static;
704 type Checksum: Debug + Clone + Send + Unpin + Serialize + DeserializeOwned + 'static;
706
707 fn list<'a>(
709 &'a self,
710 ) -> impl Future<Output = Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX>> + Send;
711
712 fn get<'s>(
714 &'s self,
715 object: Self::Object,
716 checksum: Self::Checksum,
717 range: Option<std::ops::RangeInclusive<usize>>,
718 ) -> BoxStream<'s, Result<Bytes, StorageErrorX>>;
719}
720
721#[derive(Clone, Debug)]
728pub(crate) enum SourceKind {
729 Http(HttpOneshotSource),
730 AwsS3(AwsS3Source),
731}
732
733impl OneshotSource for SourceKind {
734 type Object = ObjectKind;
735 type Checksum = ChecksumKind;
736
737 async fn list<'a>(&'a self) -> Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX> {
738 match self {
739 SourceKind::Http(http) => {
740 let objects = http.list().await.context("http")?;
741 let objects = objects
742 .into_iter()
743 .map(|(object, checksum)| {
744 (ObjectKind::Http(object), ChecksumKind::Http(checksum))
745 })
746 .collect();
747 Ok(objects)
748 }
749 SourceKind::AwsS3(s3) => {
750 let objects = s3.list().await.context("s3")?;
751 let objects = objects
752 .into_iter()
753 .map(|(object, checksum)| {
754 (ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum))
755 })
756 .collect();
757 Ok(objects)
758 }
759 }
760 }
761
762 fn get<'s>(
763 &'s self,
764 object: Self::Object,
765 checksum: Self::Checksum,
766 range: Option<std::ops::RangeInclusive<usize>>,
767 ) -> BoxStream<'s, Result<Bytes, StorageErrorX>> {
768 match (self, object, checksum) {
769 (SourceKind::Http(http), ObjectKind::Http(object), ChecksumKind::Http(checksum)) => {
770 http.get(object, checksum, range)
771 .map(|result| result.context("http"))
772 .boxed()
773 }
774 (SourceKind::AwsS3(s3), ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum)) => s3
775 .get(object, checksum, range)
776 .map(|result| result.context("aws_s3"))
777 .boxed(),
778 (SourceKind::AwsS3(_) | SourceKind::Http(_), _, _) => {
779 unreachable!("programming error! wrong source, object, and checksum kind");
780 }
781 }
782 }
783}
784
785#[derive(Debug, Clone, Serialize, Deserialize)]
787pub(crate) enum ObjectKind {
788 Http(HttpObject),
789 AwsS3(S3Object),
790}
791
792impl OneshotObject for ObjectKind {
793 fn name(&self) -> &str {
794 match self {
795 ObjectKind::Http(object) => object.name(),
796 ObjectKind::AwsS3(object) => object.name(),
797 }
798 }
799
800 fn path(&self) -> &str {
801 match self {
802 ObjectKind::Http(object) => object.path(),
803 ObjectKind::AwsS3(object) => object.path(),
804 }
805 }
806
807 fn size(&self) -> usize {
808 match self {
809 ObjectKind::Http(object) => object.size(),
810 ObjectKind::AwsS3(object) => object.size(),
811 }
812 }
813
814 fn encodings(&self) -> &[Encoding] {
815 match self {
816 ObjectKind::Http(object) => object.encodings(),
817 ObjectKind::AwsS3(object) => object.encodings(),
818 }
819 }
820}
821
822#[derive(Debug, Clone, Serialize, Deserialize)]
824pub(crate) enum ChecksumKind {
825 Http(HttpChecksum),
826 AwsS3(S3Checksum),
827}
828
829pub trait OneshotFormat: Clone {
831 type WorkRequest<S>: Debug + Clone + Send + Serialize + DeserializeOwned + 'static
833 where
834 S: OneshotSource;
835 type RecordChunk: Debug + Clone + Send + Serialize + DeserializeOwned + 'static;
837
838 fn split_work<S: OneshotSource + Send>(
843 &self,
844 source: S,
845 object: S::Object,
846 checksum: S::Checksum,
847 ) -> impl Future<Output = Result<Vec<Self::WorkRequest<S>>, StorageErrorX>> + Send;
848
849 fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
852 &'a self,
853 source: &'a S,
854 request: Self::WorkRequest<S>,
855 ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>>;
856
857 fn decode_chunk(
859 &self,
860 chunk: Self::RecordChunk,
861 rows: &mut Vec<Row>,
862 ) -> Result<usize, StorageErrorX>;
863}
864
865#[derive(Clone, Debug)]
872pub(crate) enum FormatKind {
873 Csv(CsvDecoder),
874 Parquet(ParquetFormat),
875}
876
877impl OneshotFormat for FormatKind {
878 type WorkRequest<S>
879 = RequestKind<S::Object, S::Checksum>
880 where
881 S: OneshotSource;
882 type RecordChunk = RecordChunkKind;
883
884 async fn split_work<S: OneshotSource + Send>(
885 &self,
886 source: S,
887 object: S::Object,
888 checksum: S::Checksum,
889 ) -> Result<Vec<Self::WorkRequest<S>>, StorageErrorX> {
890 match self {
891 FormatKind::Csv(csv) => {
892 let work = csv
893 .split_work(source, object, checksum)
894 .await
895 .context("csv")?
896 .into_iter()
897 .map(RequestKind::Csv)
898 .collect();
899 Ok(work)
900 }
901 FormatKind::Parquet(parquet) => {
902 let work = parquet
903 .split_work(source, object, checksum)
904 .await
905 .context("parquet")?
906 .into_iter()
907 .map(RequestKind::Parquet)
908 .collect();
909 Ok(work)
910 }
911 }
912 }
913
914 fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
915 &'a self,
916 source: &'a S,
917 request: Self::WorkRequest<S>,
918 ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>> {
919 match (self, request) {
920 (FormatKind::Csv(csv), RequestKind::Csv(request)) => csv
921 .fetch_work(source, request)
922 .map_ok(RecordChunkKind::Csv)
923 .map(|result| result.context("csv"))
924 .boxed(),
925 (FormatKind::Parquet(parquet), RequestKind::Parquet(request)) => parquet
926 .fetch_work(source, request)
927 .map_ok(RecordChunkKind::Parquet)
928 .map(|result| result.context("parquet"))
929 .boxed(),
930 (FormatKind::Parquet(_), RequestKind::Csv(_))
931 | (FormatKind::Csv(_), RequestKind::Parquet(_)) => {
932 unreachable!("programming error, {self:?}")
933 }
934 }
935 }
936
937 fn decode_chunk(
938 &self,
939 chunk: Self::RecordChunk,
940 rows: &mut Vec<Row>,
941 ) -> Result<usize, StorageErrorX> {
942 match (self, chunk) {
943 (FormatKind::Csv(csv), RecordChunkKind::Csv(chunk)) => {
944 csv.decode_chunk(chunk, rows).context("csv")
945 }
946 (FormatKind::Parquet(parquet), RecordChunkKind::Parquet(chunk)) => {
947 parquet.decode_chunk(chunk, rows).context("parquet")
948 }
949 (FormatKind::Parquet(_), RecordChunkKind::Csv(_))
950 | (FormatKind::Csv(_), RecordChunkKind::Parquet(_)) => {
951 unreachable!("programming error, {self:?}")
952 }
953 }
954 }
955}
956
957#[derive(Clone, Debug, Serialize, Deserialize)]
958pub(crate) enum RequestKind<O, C> {
959 Csv(CsvWorkRequest<O, C>),
960 Parquet(ParquetWorkRequest<O, C>),
961}
962
963#[derive(Clone, Debug, Serialize, Deserialize)]
964pub(crate) enum RecordChunkKind {
965 Csv(CsvRecord),
966 Parquet(ParquetRowGroup),
967}
968
969pub(crate) enum ObjectFilter {
970 None,
971 Files(BTreeSet<Box<str>>),
972 Pattern(glob::Pattern),
973}
974
975impl ObjectFilter {
976 pub fn try_new(filter: ContentFilter) -> Result<Self, anyhow::Error> {
977 match filter {
978 ContentFilter::None => Ok(ObjectFilter::None),
979 ContentFilter::Files(files) => {
980 let files = files.into_iter().map(|f| f.into()).collect();
981 Ok(ObjectFilter::Files(files))
982 }
983 ContentFilter::Pattern(pattern) => {
984 let pattern = glob::Pattern::new(&pattern)?;
985 Ok(ObjectFilter::Pattern(pattern))
986 }
987 }
988 }
989
990 pub fn filter<S: OneshotSource>(&self, object: &S::Object) -> bool {
992 match self {
993 ObjectFilter::None => true,
994 ObjectFilter::Files(files) => files.contains(object.path()),
995 ObjectFilter::Pattern(pattern) => pattern.matches(object.path()),
996 }
997 }
998}
999
1000#[derive(Debug, Clone, Deserialize, Serialize)]
1007pub struct StorageErrorX {
1008 kind: StorageErrorXKind,
1009 context: LinkedList<String>,
1010}
1011
1012impl fmt::Display for StorageErrorX {
1013 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1014 writeln!(f, "error: {}", self.kind)?;
1015 writeln!(f, "causes: {:?}", self.context)?;
1016 Ok(())
1017 }
1018}
1019
1020#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
1022pub enum StorageErrorXKind {
1023 #[error("csv decoding error: {0}")]
1024 CsvDecoding(Arc<str>),
1025 #[error("parquet error: {0}")]
1026 ParquetError(Arc<str>),
1027 #[error("reqwest error: {0}")]
1028 Reqwest(Arc<str>),
1029 #[error("aws s3 request error: {0}")]
1030 AwsS3Request(String),
1031 #[error("aws s3 bytestream error: {0}")]
1032 AwsS3Bytes(Arc<str>),
1033 #[error("invalid reqwest header: {0}")]
1034 InvalidHeader(Arc<str>),
1035 #[error("failed to decode Row from a record batch: {0}")]
1036 InvalidRecordBatch(Arc<str>),
1037 #[error("programming error: {0}")]
1038 ProgrammingError(Arc<str>),
1039 #[error("failed to get the size of an object")]
1040 MissingSize,
1041 #[error("object is missing the required '{0}' field")]
1042 MissingField(Arc<str>),
1043 #[error("failed while evaluating the provided mfp: '{0}'")]
1044 MfpEvalError(Arc<str>),
1045 #[error("something went wrong: {0}")]
1046 Generic(String),
1047}
1048
1049impl From<csv_async::Error> for StorageErrorXKind {
1050 fn from(err: csv_async::Error) -> Self {
1051 StorageErrorXKind::CsvDecoding(err.to_string().into())
1052 }
1053}
1054
1055impl From<reqwest::Error> for StorageErrorXKind {
1056 fn from(err: reqwest::Error) -> Self {
1057 StorageErrorXKind::Reqwest(err.to_string().into())
1058 }
1059}
1060
1061impl From<reqwest::header::ToStrError> for StorageErrorXKind {
1062 fn from(err: reqwest::header::ToStrError) -> Self {
1063 StorageErrorXKind::InvalidHeader(err.to_string().into())
1064 }
1065}
1066
1067impl From<aws_smithy_types::byte_stream::error::Error> for StorageErrorXKind {
1068 fn from(err: aws_smithy_types::byte_stream::error::Error) -> Self {
1069 StorageErrorXKind::AwsS3Request(err.to_string())
1070 }
1071}
1072
1073impl From<::parquet::errors::ParquetError> for StorageErrorXKind {
1074 fn from(err: ::parquet::errors::ParquetError) -> Self {
1075 StorageErrorXKind::ParquetError(err.to_string().into())
1076 }
1077}
1078
1079impl StorageErrorXKind {
1080 pub fn with_context<C: Display>(self, context: C) -> StorageErrorX {
1081 StorageErrorX {
1082 kind: self,
1083 context: LinkedList::from([context.to_string()]),
1084 }
1085 }
1086
1087 pub fn invalid_record_batch<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1088 StorageErrorXKind::InvalidRecordBatch(error.into())
1089 }
1090
1091 pub fn generic<C: Display>(error: C) -> StorageErrorXKind {
1092 StorageErrorXKind::Generic(error.to_string())
1093 }
1094
1095 pub fn programming_error<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1096 StorageErrorXKind::ProgrammingError(error.into())
1097 }
1098}
1099
1100impl<E> From<E> for StorageErrorX
1101where
1102 E: Into<StorageErrorXKind>,
1103{
1104 fn from(err: E) -> Self {
1105 StorageErrorX {
1106 kind: err.into(),
1107 context: LinkedList::new(),
1108 }
1109 }
1110}
1111
1112trait StorageErrorXContext<T> {
1113 fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1114 where
1115 C: Display;
1116}
1117
1118impl<T, E> StorageErrorXContext<T> for Result<T, E>
1119where
1120 E: Into<StorageErrorXKind>,
1121{
1122 fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1123 where
1124 C: Display,
1125 {
1126 match self {
1127 Ok(val) => Ok(val),
1128 Err(kind) => Err(StorageErrorX {
1129 kind: kind.into(),
1130 context: LinkedList::from([context.to_string()]),
1131 }),
1132 }
1133 }
1134}
1135
1136impl<T> StorageErrorXContext<T> for Result<T, StorageErrorX> {
1137 fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1138 where
1139 C: Display,
1140 {
1141 match self {
1142 Ok(val) => Ok(val),
1143 Err(mut e) => {
1144 e.context.push_back(context.to_string());
1145 Err(e)
1146 }
1147 }
1148 }
1149}