1use std::fmt;
66use std::sync::Arc;
67
68use aws_sdk_s3::error::DisplayErrorContext;
69use bytes::Bytes;
70use differential_dataflow::Hashable;
71use futures::stream::BoxStream;
72use futures::{StreamExt, TryStreamExt};
73use mz_expr::SafeMfpPlan;
74use mz_ore::cast::CastFrom;
75use mz_ore::error::ErrorExt;
76use mz_persist_client::Diagnostics;
77use mz_persist_client::batch::ProtoBatch;
78use mz_persist_client::cache::PersistClientCache;
79use mz_persist_types::Codec;
80use mz_persist_types::codec_impls::UnitSchema;
81use mz_repr::{DatumVec, GlobalId, Row, RowArena, Timestamp};
82use mz_storage_types::StorageDiff;
83use mz_storage_types::connections::ConnectionContext;
84use mz_storage_types::controller::CollectionMetadata;
85use mz_storage_types::oneshot_sources::{
86 ContentFilter, ContentFormat, ContentSource, OneshotIngestionRequest,
87};
88use mz_storage_types::sources::SourceData;
89use mz_timely_util::builder_async::{
90 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
91};
92use mz_timely_util::pact::Distribute;
93use serde::de::DeserializeOwned;
94use serde::{Deserialize, Serialize};
95use std::collections::{BTreeSet, LinkedList};
96use std::fmt::{Debug, Display};
97use std::future::Future;
98use timely::container::CapacityContainerBuilder;
99use timely::dataflow::channels::pact::Pipeline;
100use timely::dataflow::{Scope, StreamVec};
101use timely::progress::Antichain;
102use tracing::info;
103
104use crate::oneshot_source::aws_source::{AwsS3Source, S3Checksum, S3Object};
105use crate::oneshot_source::csv::{CsvDecoder, CsvRecord, CsvWorkRequest};
106use crate::oneshot_source::http_source::{
107 HttpChecksum, HttpObject, HttpOneshotSource, build_http_client,
108};
109use crate::oneshot_source::parquet::{ParquetFormat, ParquetRowGroup, ParquetWorkRequest};
110
111pub mod csv;
112pub mod parquet;
113
114pub mod aws_source;
115pub mod http_source;
116
117mod util;
118
119pub fn render<'scope, F>(
137 scope: Scope<'scope, Timestamp>,
138 persist_clients: Arc<PersistClientCache>,
139 connection_context: ConnectionContext,
140 collection_id: GlobalId,
141 collection_meta: CollectionMetadata,
142 request: OneshotIngestionRequest,
143 enforce_external_addresses: bool,
144 worker_callback: F,
145) -> Vec<PressOnDropButton>
146where
147 F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
148{
149 let OneshotIngestionRequest {
150 source,
151 format,
152 filter,
153 shape,
154 } = request;
155
156 let source = match source {
157 ContentSource::Http { url } => {
158 let client = match build_http_client(enforce_external_addresses) {
159 Ok(client) => client,
160 Err(err) => {
161 worker_callback(Err(format!(
162 "failed to build HTTP client for COPY FROM: {err}"
163 )));
164 return Vec::new();
165 }
166 };
167 let source = HttpOneshotSource::new(client, url);
168 SourceKind::Http(source)
169 }
170 ContentSource::AwsS3 {
171 connection,
172 connection_id,
173 uri,
174 } => {
175 let use_checksum = !(connection.endpoint.is_some()
178 && !connection.endpoint.as_ref().unwrap().contains("amazonaws"))
179 || format != ContentFormat::Parquet;
180 if !use_checksum {
181 tracing::info!(
182 "disabling checksum validation for S3 source because endpoint: {:?} is overridden and format is Parquet",
183 connection.endpoint
184 );
185 }
186 let source = AwsS3Source::new(
187 connection,
188 connection_id,
189 connection_context,
190 uri,
191 use_checksum,
192 enforce_external_addresses,
193 );
194 SourceKind::AwsS3(source)
195 }
196 };
197 tracing::info!(?source, "created oneshot source");
198
199 let format = match format {
200 ContentFormat::Csv(params) => {
201 let format = CsvDecoder::new(params, &shape.source_desc);
202 FormatKind::Csv(format)
203 }
204 ContentFormat::Parquet => {
205 let format = ParquetFormat::new(shape.source_desc);
206 FormatKind::Parquet(format)
207 }
208 };
209
210 let (objects_stream, discover_token) =
212 render_discover_objects(scope.clone(), collection_id, source.clone(), filter);
213 let (work_stream, split_token) = render_split_work(
215 scope.clone(),
216 collection_id,
217 objects_stream,
218 source.clone(),
219 format.clone(),
220 );
221 let (records_stream, fetch_token) = render_fetch_work(
223 scope.clone(),
224 collection_id,
225 source.clone(),
226 format.clone(),
227 work_stream,
228 );
229 let (rows_stream, decode_token) = render_decode_chunk(
231 scope.clone(),
232 format.clone(),
233 records_stream,
234 shape.source_mfp,
235 );
236 let (batch_stream, batch_token) = render_stage_batches_operator(
238 scope.clone(),
239 collection_id,
240 &collection_meta,
241 persist_clients,
242 rows_stream,
243 );
244
245 render_completion_operator(scope, batch_stream, worker_callback);
247
248 let tokens = vec![
249 discover_token,
250 split_token,
251 fetch_token,
252 decode_token,
253 batch_token,
254 ];
255
256 tokens
257}
258
259pub fn render_discover_objects<'scope, S>(
262 scope: Scope<'scope, Timestamp>,
263 collection_id: GlobalId,
264 source: S,
265 filter: ContentFilter,
266) -> (
267 StreamVec<'scope, Timestamp, Result<(S::Object, S::Checksum), StorageErrorX>>,
268 PressOnDropButton,
269)
270where
271 S: OneshotSource + 'static,
272{
273 let worker_id = scope.index();
275 let num_workers = scope.peers();
276 let active_worker_id = usize::cast_from((collection_id, "discover").hashed()) % num_workers;
277 let is_active_worker = worker_id == active_worker_id;
278
279 let mut builder = AsyncOperatorBuilder::new("CopyFrom-discover".to_string(), scope.clone());
280
281 let (start_handle, start_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
282
283 let shutdown = builder.build(move |caps| async move {
284 let [start_cap] = caps.try_into().unwrap();
285
286 if !is_active_worker {
287 return;
288 }
289
290 let filter = match ObjectFilter::try_new(filter) {
291 Ok(filter) => filter,
292 Err(err) => {
293 tracing::warn!(?err, "failed to create filter");
294 start_handle.give(&start_cap, Err(StorageErrorXKind::generic(err).into()));
295 return;
296 }
297 };
298
299 let work = source.list().await.context("list");
300 match work {
301 Ok(objects) => {
302 let (include, exclude): (Vec<_>, Vec<_>) = objects
303 .into_iter()
304 .partition(|(o, _checksum)| filter.filter::<S>(o));
305 tracing::info!(%worker_id, ?include, ?exclude, "listed objects");
306 if include.is_empty() {
307 let err = StorageErrorXKind::NoMatchingFiles.with_context("discover");
308 start_handle.give(&start_cap, Err(err));
309 return;
310 }
311 include
312 .into_iter()
313 .for_each(|object| start_handle.give(&start_cap, Ok(object)))
314 }
315 Err(err) => {
316 tracing::warn!(?err, "failed to list oneshot source");
317 start_handle.give(&start_cap, Err(err))
318 }
319 }
320 });
321
322 (start_stream, shutdown.press_on_drop())
323}
324
325pub fn render_split_work<'scope, T, S, F>(
328 scope: Scope<'scope, T>,
329 collection_id: GlobalId,
330 objects: StreamVec<'scope, T, Result<(S::Object, S::Checksum), StorageErrorX>>,
331 source: S,
332 format: F,
333) -> (
334 StreamVec<'scope, T, Result<F::WorkRequest<S>, StorageErrorX>>,
335 PressOnDropButton,
336)
337where
338 T: timely::progress::Timestamp,
339 S: OneshotSource + Send + Sync + 'static,
340 F: OneshotFormat + Send + Sync + 'static,
341{
342 let worker_id = scope.index();
343 let mut builder = AsyncOperatorBuilder::new("CopyFrom-split_work".to_string(), scope.clone());
344
345 let (request_handle, request_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
346 let mut objects_handle = builder.new_input_for(objects, Distribute, &request_handle);
347
348 let shutdown = builder.build(move |caps| async move {
349 let [_objects_cap] = caps.try_into().unwrap();
350
351 info!(%collection_id, %worker_id, "CopyFrom Split Work");
352
353 while let Some(event) = objects_handle.next().await {
354 let (capability, maybe_objects) = match event {
355 AsyncEvent::Data(cap, req) => (cap, req),
356 AsyncEvent::Progress(_) => continue,
357 };
358
359 let result = async {
362 let mut requests = Vec::new();
363
364 for maybe_object in maybe_objects {
365 let (object, checksum) = maybe_object?;
367
368 let format_ = format.clone();
369 let source_ = source.clone();
370 let work_requests = mz_ore::task::spawn(|| "split-work", async move {
371 info!(%worker_id, object = %object.name(), "splitting object");
372 format_.split_work(source_.clone(), object, checksum).await
373 })
374 .await?;
375
376 requests.extend(work_requests);
377 }
378
379 Ok::<_, StorageErrorX>(requests)
380 }
381 .await
382 .context("split");
383
384 match result {
385 Ok(requests) => requests
386 .into_iter()
387 .for_each(|req| request_handle.give(&capability, Ok(req))),
388 Err(err) => request_handle.give(&capability, Err(err)),
389 }
390 }
391 });
392
393 (request_stream, shutdown.press_on_drop())
394}
395
396pub fn render_fetch_work<'scope, T, S, F>(
400 scope: Scope<'scope, T>,
401 collection_id: GlobalId,
402 source: S,
403 format: F,
404 work_requests: StreamVec<'scope, T, Result<F::WorkRequest<S>, StorageErrorX>>,
405) -> (
406 StreamVec<'scope, T, Result<F::RecordChunk, StorageErrorX>>,
407 PressOnDropButton,
408)
409where
410 T: timely::progress::Timestamp,
411 S: OneshotSource + Sync + 'static,
412 F: OneshotFormat + Sync + 'static,
413{
414 let worker_id = scope.index();
415 let mut builder = AsyncOperatorBuilder::new("CopyFrom-fetch_work".to_string(), scope.clone());
416
417 let (record_handle, record_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
418 let mut work_requests_handle = builder.new_input_for(work_requests, Distribute, &record_handle);
419
420 let shutdown = builder.build(move |caps| async move {
421 let [_work_cap] = caps.try_into().unwrap();
422
423 info!(%collection_id, %worker_id, "CopyFrom Fetch Work");
424
425 while let Some(event) = work_requests_handle.next().await {
426 let (capability, maybe_requests) = match event {
427 AsyncEvent::Data(cap, req) => (cap, req),
428 AsyncEvent::Progress(_) => continue,
429 };
430
431 let result = async {
433 for maybe_request in maybe_requests {
435 let request = maybe_request?;
436
437 let mut work_stream = format.fetch_work(&source, request);
438 while let Some(result) = work_stream.next().await {
439 let record_chunk = result.context("fetch worker")?;
441
442 record_handle.give(&capability, Ok(record_chunk));
447 }
448 }
449
450 Ok::<_, StorageErrorX>(())
451 }
452 .await
453 .context("fetch work");
454
455 if let Err(err) = result {
456 tracing::warn!(?err, "failed to fetch");
457 record_handle.give(&capability, Err(err))
458 }
459 }
460 });
461
462 (record_stream, shutdown.press_on_drop())
463}
464
465pub fn render_decode_chunk<'scope, T, F>(
468 scope: Scope<'scope, T>,
469 format: F,
470 record_chunks: StreamVec<'scope, T, Result<F::RecordChunk, StorageErrorX>>,
471 mfp: SafeMfpPlan,
472) -> (
473 StreamVec<'scope, T, Result<Row, StorageErrorX>>,
474 PressOnDropButton,
475)
476where
477 T: timely::progress::Timestamp,
478 F: OneshotFormat + 'static,
479{
480 let mut builder = AsyncOperatorBuilder::new("CopyFrom-decode_chunk".to_string(), scope.clone());
481
482 let (row_handle, row_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
483 let mut record_chunk_handle = builder.new_input_for(record_chunks, Distribute, &row_handle);
484
485 let shutdown = builder.build(move |caps| async move {
486 let [_row_cap] = caps.try_into().unwrap();
487
488 let mut datum_vec = DatumVec::default();
489 let mut row_buf = Row::default();
490
491 while let Some(event) = record_chunk_handle.next().await {
492 let (capability, maybe_chunks) = match event {
493 AsyncEvent::Data(cap, data) => (cap, data),
494 AsyncEvent::Progress(_) => continue,
495 };
496
497 let result = async {
498 let mut rows = Vec::new();
499 for maybe_chunk in maybe_chunks {
500 let chunk = maybe_chunk?;
501 format.decode_chunk(chunk, &mut rows)?;
502 }
503 Ok::<_, StorageErrorX>(rows)
504 }
505 .await
506 .context("decode chunk");
507
508 match result {
509 Ok(rows) => {
510 let mut row_arena = RowArena::new();
514 for row in rows {
517 row_arena.clear();
518 let mut datums = datum_vec.borrow_with(&row);
519 let result = mfp
520 .evaluate_into(&mut *datums, &row_arena, &mut row_buf)
521 .map(|row| row.cloned());
522
523 match result {
524 Ok(Some(row)) => row_handle.give(&capability, Ok(row)),
525 Ok(None) => {
526 mz_ore::soft_panic_or_log!("oneshot source MFP filtered out data!");
529 }
530 Err(e) => {
531 let err = StorageErrorXKind::MfpEvalError(e.to_string().into())
532 .with_context("decode");
533 row_handle.give(&capability, Err(err))
534 }
535 }
536 }
537 }
538 Err(err) => row_handle.give(&capability, Err(err)),
539 }
540 }
541 });
542
543 (row_stream, shutdown.press_on_drop())
544}
545
546pub fn render_stage_batches_operator<'scope, T>(
549 scope: Scope<'scope, T>,
550 collection_id: GlobalId,
551 collection_meta: &CollectionMetadata,
552 persist_clients: Arc<PersistClientCache>,
553 rows_stream: StreamVec<'scope, T, Result<Row, StorageErrorX>>,
554) -> (
555 StreamVec<'scope, T, Result<ProtoBatch, StorageErrorX>>,
556 PressOnDropButton,
557)
558where
559 T: timely::progress::Timestamp,
560{
561 let persist_location = collection_meta.persist_location.clone();
562 let shard_id = collection_meta.data_shard;
563 let collection_desc = Arc::new(collection_meta.relation_desc.clone());
564
565 let mut builder =
566 AsyncOperatorBuilder::new("CopyFrom-stage_batches".to_string(), scope.clone());
567
568 let (proto_batch_handle, proto_batch_stream) =
569 builder.new_output::<CapacityContainerBuilder<_>>();
570 let mut rows_handle = builder.new_input_for(rows_stream, Pipeline, &proto_batch_handle);
571
572 let shutdown = builder.build(move |caps| async move {
573 let [proto_batch_cap] = caps.try_into().unwrap();
574
575 let persist_client = persist_clients
577 .open(persist_location)
578 .await
579 .expect("failed to open Persist client");
580 let persist_diagnostics = Diagnostics {
581 shard_name: collection_id.to_string(),
582 handle_purpose: "CopyFrom::stage_batches".to_string(),
583 };
584 let write_handle = persist_client
585 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
586 shard_id,
587 Arc::clone(&collection_desc),
588 Arc::new(UnitSchema),
589 persist_diagnostics,
590 )
591 .await
592 .expect("could not open Persist shard");
593
594 let lower = mz_repr::Timestamp::MIN;
598 let upper = Antichain::from_elem(lower.step_forward());
599
600 let mut batch_builder = write_handle.builder(Antichain::from_elem(lower));
601
602 while let Some(event) = rows_handle.next().await {
603 let AsyncEvent::Data(_, row_batch) = event else {
604 continue;
605 };
606
607 for maybe_row in row_batch {
609 let maybe_row = maybe_row.and_then(|row| {
610 Row::validate(&row, &*collection_desc).map_err(|e| {
611 StorageErrorXKind::invalid_record_batch(e).with_context("stage_batches")
612 })?;
613 Ok(row)
614 });
615 match maybe_row {
616 Ok(row) => {
618 let data = SourceData(Ok(row));
619 batch_builder
620 .add(&data, &(), &lower, &1)
621 .await
622 .expect("failed to add Row to batch");
623 }
624 Err(err) => {
626 let batch = batch_builder
628 .finish(upper)
629 .await
630 .expect("failed to cleanup batch");
631 batch.delete().await;
632
633 proto_batch_handle
635 .give(&proto_batch_cap, Err(err).context("stage batches"));
636 return;
637 }
638 }
639 }
640 }
641
642 let batch = batch_builder
643 .finish(upper)
644 .await
645 .expect("failed to create Batch");
646
647 let proto_batch = batch.into_transmittable_batch();
656 proto_batch_handle.give(&proto_batch_cap, Ok(proto_batch));
657 });
658
659 (proto_batch_stream, shutdown.press_on_drop())
660}
661
662pub fn render_completion_operator<'scope, T, F>(
665 scope: Scope<'scope, T>,
666 results_stream: StreamVec<'scope, T, Result<ProtoBatch, StorageErrorX>>,
667 worker_callback: F,
668) where
669 T: timely::progress::Timestamp,
670 F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
671{
672 let mut builder = AsyncOperatorBuilder::new("CopyFrom-completion".to_string(), scope.clone());
673 let mut results_input = builder.new_disconnected_input(results_stream, Pipeline);
674
675 builder.build(move |_| async move {
676 let result = async move {
677 let mut maybe_payload: Option<ProtoBatch> = None;
678
679 while let Some(event) = results_input.next().await {
680 if let AsyncEvent::Data(_cap, results) = event {
681 let [result] = results
682 .try_into()
683 .expect("only 1 event on the result stream");
684
685 if maybe_payload.is_some() {
687 panic!("expected only one batch!");
688 }
689
690 maybe_payload = Some(result.map_err(|e| e.to_string())?);
691 }
692 }
693
694 Ok(maybe_payload)
695 }
696 .await;
697
698 worker_callback(result);
700 });
701}
702
703pub trait OneshotObject {
705 fn name(&self) -> &str;
707
708 fn path(&self) -> &str;
710
711 fn size(&self) -> usize;
713
714 fn encodings(&self) -> &[Encoding];
720}
721
722#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
724pub enum Encoding {
725 Bzip2,
726 Gzip,
727 Xz,
728 Zstd,
729}
730
731pub trait OneshotSource: Clone + Send + Unpin {
733 type Object: OneshotObject
735 + Debug
736 + Clone
737 + Send
738 + Unpin
739 + Serialize
740 + DeserializeOwned
741 + 'static;
742 type Checksum: Debug + Clone + Send + Unpin + Serialize + DeserializeOwned + 'static;
744
745 fn list<'a>(
747 &'a self,
748 ) -> impl Future<Output = Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX>> + Send;
749
750 fn get<'s>(
752 &'s self,
753 object: Self::Object,
754 checksum: Self::Checksum,
755 range: Option<std::ops::RangeInclusive<usize>>,
756 ) -> BoxStream<'s, Result<Bytes, StorageErrorX>>;
757}
758
759#[derive(Clone, Debug)]
766pub(crate) enum SourceKind {
767 Http(HttpOneshotSource),
768 AwsS3(AwsS3Source),
769}
770
771impl OneshotSource for SourceKind {
772 type Object = ObjectKind;
773 type Checksum = ChecksumKind;
774
775 async fn list<'a>(&'a self) -> Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX> {
776 match self {
777 SourceKind::Http(http) => {
778 let objects = http.list().await.context("http")?;
779 let objects = objects
780 .into_iter()
781 .map(|(object, checksum)| {
782 (ObjectKind::Http(object), ChecksumKind::Http(checksum))
783 })
784 .collect();
785 Ok(objects)
786 }
787 SourceKind::AwsS3(s3) => {
788 let objects = s3.list().await.context("s3")?;
789 let objects = objects
790 .into_iter()
791 .map(|(object, checksum)| {
792 (ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum))
793 })
794 .collect();
795 Ok(objects)
796 }
797 }
798 }
799
800 fn get<'s>(
801 &'s self,
802 object: Self::Object,
803 checksum: Self::Checksum,
804 range: Option<std::ops::RangeInclusive<usize>>,
805 ) -> BoxStream<'s, Result<Bytes, StorageErrorX>> {
806 match (self, object, checksum) {
807 (SourceKind::Http(http), ObjectKind::Http(object), ChecksumKind::Http(checksum)) => {
808 http.get(object, checksum, range)
809 .map(|result| result.context("http"))
810 .boxed()
811 }
812 (SourceKind::AwsS3(s3), ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum)) => s3
813 .get(object, checksum, range)
814 .map(|result| result.context("aws_s3"))
815 .boxed(),
816 (SourceKind::AwsS3(_) | SourceKind::Http(_), _, _) => {
817 unreachable!("programming error! wrong source, object, and checksum kind");
818 }
819 }
820 }
821}
822
823#[derive(Debug, Clone, Serialize, Deserialize)]
825pub(crate) enum ObjectKind {
826 Http(HttpObject),
827 AwsS3(S3Object),
828}
829
830impl OneshotObject for ObjectKind {
831 fn name(&self) -> &str {
832 match self {
833 ObjectKind::Http(object) => object.name(),
834 ObjectKind::AwsS3(object) => object.name(),
835 }
836 }
837
838 fn path(&self) -> &str {
839 match self {
840 ObjectKind::Http(object) => object.path(),
841 ObjectKind::AwsS3(object) => object.path(),
842 }
843 }
844
845 fn size(&self) -> usize {
846 match self {
847 ObjectKind::Http(object) => object.size(),
848 ObjectKind::AwsS3(object) => object.size(),
849 }
850 }
851
852 fn encodings(&self) -> &[Encoding] {
853 match self {
854 ObjectKind::Http(object) => object.encodings(),
855 ObjectKind::AwsS3(object) => object.encodings(),
856 }
857 }
858}
859
860#[derive(Debug, Clone, Serialize, Deserialize)]
862pub(crate) enum ChecksumKind {
863 Http(HttpChecksum),
864 AwsS3(S3Checksum),
865}
866
867pub trait OneshotFormat: Clone {
869 type WorkRequest<S>: Debug + Clone + Send + Serialize + DeserializeOwned + 'static
871 where
872 S: OneshotSource;
873 type RecordChunk: Debug + Clone + Send + Serialize + DeserializeOwned + 'static;
875
876 fn split_work<S: OneshotSource + Send>(
881 &self,
882 source: S,
883 object: S::Object,
884 checksum: S::Checksum,
885 ) -> impl Future<Output = Result<Vec<Self::WorkRequest<S>>, StorageErrorX>> + Send;
886
887 fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
890 &'a self,
891 source: &'a S,
892 request: Self::WorkRequest<S>,
893 ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>>;
894
895 fn decode_chunk(
897 &self,
898 chunk: Self::RecordChunk,
899 rows: &mut Vec<Row>,
900 ) -> Result<usize, StorageErrorX>;
901}
902
903#[derive(Clone, Debug)]
910pub(crate) enum FormatKind {
911 Csv(CsvDecoder),
912 Parquet(ParquetFormat),
913}
914
915impl OneshotFormat for FormatKind {
916 type WorkRequest<S>
917 = RequestKind<S::Object, S::Checksum>
918 where
919 S: OneshotSource;
920 type RecordChunk = RecordChunkKind;
921
922 async fn split_work<S: OneshotSource + Send>(
923 &self,
924 source: S,
925 object: S::Object,
926 checksum: S::Checksum,
927 ) -> Result<Vec<Self::WorkRequest<S>>, StorageErrorX> {
928 match self {
929 FormatKind::Csv(csv) => {
930 let work = csv
931 .split_work(source, object, checksum)
932 .await
933 .context("csv")?
934 .into_iter()
935 .map(RequestKind::Csv)
936 .collect();
937 Ok(work)
938 }
939 FormatKind::Parquet(parquet) => {
940 let work = parquet
941 .split_work(source, object, checksum)
942 .await
943 .context("parquet")?
944 .into_iter()
945 .map(RequestKind::Parquet)
946 .collect();
947 Ok(work)
948 }
949 }
950 }
951
952 fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
953 &'a self,
954 source: &'a S,
955 request: Self::WorkRequest<S>,
956 ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>> {
957 match (self, request) {
958 (FormatKind::Csv(csv), RequestKind::Csv(request)) => csv
959 .fetch_work(source, request)
960 .map_ok(RecordChunkKind::Csv)
961 .map(|result| result.context("csv"))
962 .boxed(),
963 (FormatKind::Parquet(parquet), RequestKind::Parquet(request)) => parquet
964 .fetch_work(source, request)
965 .map_ok(RecordChunkKind::Parquet)
966 .map(|result| result.context("parquet"))
967 .boxed(),
968 (FormatKind::Parquet(_), RequestKind::Csv(_))
969 | (FormatKind::Csv(_), RequestKind::Parquet(_)) => {
970 unreachable!("programming error, {self:?}")
971 }
972 }
973 }
974
975 fn decode_chunk(
976 &self,
977 chunk: Self::RecordChunk,
978 rows: &mut Vec<Row>,
979 ) -> Result<usize, StorageErrorX> {
980 match (self, chunk) {
981 (FormatKind::Csv(csv), RecordChunkKind::Csv(chunk)) => {
982 csv.decode_chunk(chunk, rows).context("csv")
983 }
984 (FormatKind::Parquet(parquet), RecordChunkKind::Parquet(chunk)) => {
985 parquet.decode_chunk(chunk, rows).context("parquet")
986 }
987 (FormatKind::Parquet(_), RecordChunkKind::Csv(_))
988 | (FormatKind::Csv(_), RecordChunkKind::Parquet(_)) => {
989 unreachable!("programming error, {self:?}")
990 }
991 }
992 }
993}
994
995#[derive(Clone, Debug, Serialize, Deserialize)]
996pub(crate) enum RequestKind<O, C> {
997 Csv(CsvWorkRequest<O, C>),
998 Parquet(ParquetWorkRequest<O, C>),
999}
1000
1001#[derive(Clone, Debug, Serialize, Deserialize)]
1002pub(crate) enum RecordChunkKind {
1003 Csv(CsvRecord),
1004 Parquet(ParquetRowGroup),
1005}
1006
1007pub(crate) enum ObjectFilter {
1008 None,
1009 Files(BTreeSet<Box<str>>),
1010 Pattern(glob::Pattern),
1011}
1012
1013impl ObjectFilter {
1014 pub fn try_new(filter: ContentFilter) -> Result<Self, anyhow::Error> {
1015 match filter {
1016 ContentFilter::None => Ok(ObjectFilter::None),
1017 ContentFilter::Files(files) => {
1018 let files = files.into_iter().map(|f| f.into()).collect();
1019 Ok(ObjectFilter::Files(files))
1020 }
1021 ContentFilter::Pattern(pattern) => {
1022 let pattern = glob::Pattern::new(&pattern)?;
1023 Ok(ObjectFilter::Pattern(pattern))
1024 }
1025 }
1026 }
1027
1028 pub fn filter<S: OneshotSource>(&self, object: &S::Object) -> bool {
1030 match self {
1031 ObjectFilter::None => true,
1032 ObjectFilter::Files(files) => files.contains(object.path()),
1033 ObjectFilter::Pattern(pattern) => pattern.matches(object.path()),
1034 }
1035 }
1036}
1037
1038#[derive(Debug, Clone, Deserialize, Serialize)]
1045pub struct StorageErrorX {
1046 kind: StorageErrorXKind,
1047 context: LinkedList<String>,
1048}
1049
1050impl fmt::Display for StorageErrorX {
1051 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1052 writeln!(f, "error: {}", self.kind)?;
1053 writeln!(f, "causes: {:?}", self.context)?;
1054 Ok(())
1055 }
1056}
1057
1058#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
1060pub enum StorageErrorXKind {
1061 #[error("csv decoding error: {0}")]
1062 CsvDecoding(Arc<str>),
1063 #[error("parquet error: {0}")]
1064 ParquetError(Arc<str>),
1065 #[error("reqwest error: {0}")]
1066 Reqwest(Arc<str>),
1067 #[error("aws s3 request error: {0}")]
1068 AwsS3Request(String),
1069 #[error("aws s3 bytestream error: {0}")]
1070 AwsS3Bytes(Arc<str>),
1071 #[error("invalid reqwest header: {0}")]
1072 InvalidHeader(Arc<str>),
1073 #[error("failed to decode Row from a record batch: {0}")]
1074 InvalidRecordBatch(Arc<str>),
1075 #[error("programming error: {0}")]
1076 ProgrammingError(Arc<str>),
1077 #[error("failed to get the size of an object")]
1078 MissingSize,
1079 #[error("object is missing the required '{0}' field")]
1080 MissingField(Arc<str>),
1081 #[error("failed while evaluating the provided mfp: '{0}'")]
1082 MfpEvalError(Arc<str>),
1083 #[error("no matching files found at the given url")]
1084 NoMatchingFiles,
1085 #[error("server returned HTTP {0}; Redirects are not supported, use the final URL directly.")]
1086 Redirect(u16),
1087 #[error("something went wrong: {0}")]
1088 Generic(String),
1089}
1090
1091impl From<csv_async::Error> for StorageErrorXKind {
1092 fn from(err: csv_async::Error) -> Self {
1093 StorageErrorXKind::CsvDecoding(err.to_string().into())
1094 }
1095}
1096
1097impl From<reqwest::Error> for StorageErrorXKind {
1098 fn from(err: reqwest::Error) -> Self {
1099 StorageErrorXKind::Reqwest(err.to_string_with_causes().into())
1104 }
1105}
1106
1107impl From<reqwest::header::ToStrError> for StorageErrorXKind {
1108 fn from(err: reqwest::header::ToStrError) -> Self {
1109 StorageErrorXKind::InvalidHeader(err.to_string().into())
1110 }
1111}
1112
1113impl From<aws_smithy_types::byte_stream::error::Error> for StorageErrorXKind {
1114 fn from(err: aws_smithy_types::byte_stream::error::Error) -> Self {
1115 StorageErrorXKind::AwsS3Bytes(DisplayErrorContext(err).to_string().into())
1116 }
1117}
1118
1119impl From<::parquet::errors::ParquetError> for StorageErrorXKind {
1120 fn from(err: ::parquet::errors::ParquetError) -> Self {
1121 StorageErrorXKind::ParquetError(err.to_string().into())
1122 }
1123}
1124
1125impl StorageErrorXKind {
1126 pub fn with_context<C: Display>(self, context: C) -> StorageErrorX {
1127 StorageErrorX {
1128 kind: self,
1129 context: LinkedList::from([context.to_string()]),
1130 }
1131 }
1132
1133 pub fn invalid_record_batch<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1134 StorageErrorXKind::InvalidRecordBatch(error.into())
1135 }
1136
1137 pub fn generic<C: Display>(error: C) -> StorageErrorXKind {
1138 StorageErrorXKind::Generic(error.to_string())
1139 }
1140
1141 pub fn programming_error<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1142 StorageErrorXKind::ProgrammingError(error.into())
1143 }
1144}
1145
1146impl<E> From<E> for StorageErrorX
1147where
1148 E: Into<StorageErrorXKind>,
1149{
1150 fn from(err: E) -> Self {
1151 StorageErrorX {
1152 kind: err.into(),
1153 context: LinkedList::new(),
1154 }
1155 }
1156}
1157
1158trait StorageErrorXContext<T> {
1159 fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1160 where
1161 C: Display;
1162}
1163
1164impl<T, E> StorageErrorXContext<T> for Result<T, E>
1165where
1166 E: Into<StorageErrorXKind>,
1167{
1168 fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1169 where
1170 C: Display,
1171 {
1172 match self {
1173 Ok(val) => Ok(val),
1174 Err(kind) => Err(StorageErrorX {
1175 kind: kind.into(),
1176 context: LinkedList::from([context.to_string()]),
1177 }),
1178 }
1179 }
1180}
1181
1182impl<T> StorageErrorXContext<T> for Result<T, StorageErrorX> {
1183 fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1184 where
1185 C: Display,
1186 {
1187 match self {
1188 Ok(val) => Ok(val),
1189 Err(mut e) => {
1190 e.context.push_back(context.to_string());
1191 Err(e)
1192 }
1193 }
1194 }
1195}