Skip to main content

mz_storage_operators/
oneshot_source.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! "Oneshot" sources are a one-time ingestion of data from an external system, unlike traditional
11//! sources, they __do not__ run continuously. Oneshot sources are generally used for `COPY FROM`
12//! SQL statements.
13//!
14//! The implementation of reading and parsing data is behind the [`OneshotSource`] and
15//! [`OneshotFormat`] traits, respectively. Users looking to add new sources or formats, should
16//! only need to add new implementations for these traits.
17//!
18//! * [`OneshotSource`] is an interface for listing and reading from an external system, e.g. an
19//!   HTTP server.
20//! * [`OneshotFormat`] is an interface for how to parallelize and parse data, e.g. CSV.
21//!
22//! Given a [`OneshotSource`] and a [`OneshotFormat`] we build a dataflow structured like the
23//! following:
24//!
25//! ```text
26//!             ┏━━━━━━━━━━━━━━━┓
27//!             ┃    Discover   ┃
28//!             ┃    objects    ┃
29//!             ┗━━━━━━━┯━━━━━━━┛
30//!           ┌───< Distribute >───┐
31//!           │                    │
32//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
33//!     ┃  Split   ┃   ...   ┃  Split   ┃
34//!     ┃  Work 1  ┃         ┃  Work n  ┃
35//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
36//!           │                    │
37//!           ├───< Distribute >───┤
38//!           │                    │
39//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
40//!     ┃  Fetch   ┃   ...   ┃  Fetch   ┃
41//!     ┃  Work 1  ┃         ┃  Work n  ┃
42//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
43//!           │                    │
44//!           ├───< Distribute >───┤
45//!           │                    │
46//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
47//!     ┃  Decode  ┃   ...   ┃  Decode  ┃
48//!     ┃  Chunk 1 ┃         ┃  Chunk n ┃
49//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
50//!           │                    │
51//!           │                    │
52//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
53//!     ┃  Stage   ┃   ...   ┃  Stage   ┃
54//!     ┃  Batch 1 ┃         ┃  Batch n ┃
55//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
56//!           │                    │
57//!           └─────────┬──────────┘
58//!               ┏━━━━━v━━━━┓
59//!               ┃  Result  ┃
60//!               ┃ Callback ┃
61//!               ┗━━━━━━━━━━┛
62//! ```
63//!
64
65use std::fmt;
66use std::sync::Arc;
67
68use aws_sdk_s3::error::DisplayErrorContext;
69use bytes::Bytes;
70use differential_dataflow::Hashable;
71use futures::stream::BoxStream;
72use futures::{StreamExt, TryStreamExt};
73use mz_expr::SafeMfpPlan;
74use mz_ore::cast::CastFrom;
75use mz_persist_client::Diagnostics;
76use mz_persist_client::batch::ProtoBatch;
77use mz_persist_client::cache::PersistClientCache;
78use mz_persist_types::Codec;
79use mz_persist_types::codec_impls::UnitSchema;
80use mz_repr::{DatumVec, GlobalId, Row, RowArena, Timestamp};
81use mz_storage_types::StorageDiff;
82use mz_storage_types::connections::ConnectionContext;
83use mz_storage_types::controller::CollectionMetadata;
84use mz_storage_types::oneshot_sources::{
85    ContentFilter, ContentFormat, ContentSource, OneshotIngestionRequest,
86};
87use mz_storage_types::sources::SourceData;
88use mz_timely_util::builder_async::{
89    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
90};
91use mz_timely_util::pact::Distribute;
92use serde::de::DeserializeOwned;
93use serde::{Deserialize, Serialize};
94use std::collections::{BTreeSet, LinkedList};
95use std::fmt::{Debug, Display};
96use std::future::Future;
97use timely::container::CapacityContainerBuilder;
98use timely::dataflow::channels::pact::Pipeline;
99use timely::dataflow::{Scope, StreamVec};
100use timely::progress::Antichain;
101use tracing::info;
102
103use crate::oneshot_source::aws_source::{AwsS3Source, S3Checksum, S3Object};
104use crate::oneshot_source::csv::{CsvDecoder, CsvRecord, CsvWorkRequest};
105use crate::oneshot_source::http_source::{HttpChecksum, HttpObject, HttpOneshotSource};
106use crate::oneshot_source::parquet::{ParquetFormat, ParquetRowGroup, ParquetWorkRequest};
107
108pub mod csv;
109pub mod parquet;
110
111pub mod aws_source;
112pub mod http_source;
113
114mod util;
115
116/// Render a dataflow to do a "oneshot" ingestion.
117///
118/// Roughly the operators we render do the following:
119///
120/// 1. Discover objects with a [`OneshotSource`].
121/// 2. Split objects into separate units of work based on the [`OneshotFormat`].
122/// 3. Fetch individual units of work (aka fetch byte blobs) with the
123///    [`OneshotFormat`] and [`OneshotSource`].
124/// 4. Decode the fetched byte blobs into [`Row`]s.
125/// 5. Stage the [`Row`]s into Persist returning [`ProtoBatch`]es.
126///
127/// TODO(cf3): Benchmark combining operators 3, 4, and 5. Currently we keep them
128/// separate for the [`CsvDecoder`]. CSV decoding is hard to do in parallel so we
129/// currently have a single worker Fetch an entire file, and then distributes
130/// chunks for parallel Decoding. We should benchmark if this is actually faster
131/// than just a single worker both fetching and decoding.
132///
133pub fn render<G, F>(
134    scope: G,
135    persist_clients: Arc<PersistClientCache>,
136    connection_context: ConnectionContext,
137    collection_id: GlobalId,
138    collection_meta: CollectionMetadata,
139    request: OneshotIngestionRequest,
140    worker_callback: F,
141) -> Vec<PressOnDropButton>
142where
143    G: Scope<Timestamp = Timestamp>,
144    F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
145{
146    let OneshotIngestionRequest {
147        source,
148        format,
149        filter,
150        shape,
151    } = request;
152
153    let source = match source {
154        ContentSource::Http { url } => {
155            let source = HttpOneshotSource::new(reqwest::Client::default(), url);
156            SourceKind::Http(source)
157        }
158        ContentSource::AwsS3 {
159            connection,
160            connection_id,
161            uri,
162        } => {
163            // Checksum validation does not work with GCS when using ranges, which happens with parquet.
164            // So, we disable checksum if both the endpoint is overridden to a non-AWS endpoint and the format is Parquet.
165            let use_checksum = !(connection.endpoint.is_some()
166                && !connection.endpoint.as_ref().unwrap().contains("amazonaws"))
167                || format != ContentFormat::Parquet;
168            if !use_checksum {
169                tracing::info!(
170                    "disabling checksum validation for S3 source because endpoint: {:?} is overridden and format is Parquet",
171                    connection.endpoint
172                );
173            }
174            let source = AwsS3Source::new(
175                connection,
176                connection_id,
177                connection_context,
178                uri,
179                use_checksum,
180            );
181            SourceKind::AwsS3(source)
182        }
183    };
184    tracing::info!(?source, "created oneshot source");
185
186    let format = match format {
187        ContentFormat::Csv(params) => {
188            let format = CsvDecoder::new(params, &shape.source_desc);
189            FormatKind::Csv(format)
190        }
191        ContentFormat::Parquet => {
192            let format = ParquetFormat::new(shape.source_desc);
193            FormatKind::Parquet(format)
194        }
195    };
196
197    // Discover what objects are available to copy.
198    let (objects_stream, discover_token) =
199        render_discover_objects(scope.clone(), collection_id, source.clone(), filter);
200    // Split the objects into individual units of work.
201    let (work_stream, split_token) = render_split_work(
202        scope.clone(),
203        collection_id,
204        objects_stream,
205        source.clone(),
206        format.clone(),
207    );
208    // Fetch each unit of work, returning chunks of records.
209    let (records_stream, fetch_token) = render_fetch_work(
210        scope.clone(),
211        collection_id,
212        source.clone(),
213        format.clone(),
214        work_stream,
215    );
216    // Parse chunks of records into Rows.
217    let (rows_stream, decode_token) = render_decode_chunk(
218        scope.clone(),
219        format.clone(),
220        records_stream,
221        shape.source_mfp,
222    );
223    // Stage the Rows in Persist.
224    let (batch_stream, batch_token) = render_stage_batches_operator(
225        scope.clone(),
226        collection_id,
227        &collection_meta,
228        persist_clients,
229        rows_stream,
230    );
231
232    // Collect all results together and notify the upstream of whether or not we succeeded.
233    render_completion_operator(scope, batch_stream, worker_callback);
234
235    let tokens = vec![
236        discover_token,
237        split_token,
238        fetch_token,
239        decode_token,
240        batch_token,
241    ];
242
243    tokens
244}
245
246/// Render an operator that using a [`OneshotSource`] will discover what objects are available
247/// for fetching.
248pub fn render_discover_objects<G, S>(
249    scope: G,
250    collection_id: GlobalId,
251    source: S,
252    filter: ContentFilter,
253) -> (
254    StreamVec<G, Result<(S::Object, S::Checksum), StorageErrorX>>,
255    PressOnDropButton,
256)
257where
258    G: Scope<Timestamp = Timestamp>,
259    S: OneshotSource + 'static,
260{
261    // Only a single worker is responsible for discovering objects.
262    let worker_id = scope.index();
263    let num_workers = scope.peers();
264    let active_worker_id = usize::cast_from((collection_id, "discover").hashed()) % num_workers;
265    let is_active_worker = worker_id == active_worker_id;
266
267    let mut builder = AsyncOperatorBuilder::new("CopyFrom-discover".to_string(), scope.clone());
268
269    let (start_handle, start_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
270
271    let shutdown = builder.build(move |caps| async move {
272        let [start_cap] = caps.try_into().unwrap();
273
274        if !is_active_worker {
275            return;
276        }
277
278        let filter = match ObjectFilter::try_new(filter) {
279            Ok(filter) => filter,
280            Err(err) => {
281                tracing::warn!(?err, "failed to create filter");
282                start_handle.give(&start_cap, Err(StorageErrorXKind::generic(err).into()));
283                return;
284            }
285        };
286
287        let work = source.list().await.context("list");
288        match work {
289            Ok(objects) => {
290                let (include, exclude): (Vec<_>, Vec<_>) = objects
291                    .into_iter()
292                    .partition(|(o, _checksum)| filter.filter::<S>(o));
293                tracing::info!(%worker_id, ?include, ?exclude, "listed objects");
294                if include.is_empty() {
295                    let err = StorageErrorXKind::NoMatchingFiles.with_context("discover");
296                    start_handle.give(&start_cap, Err(err));
297                    return;
298                }
299                include
300                    .into_iter()
301                    .for_each(|object| start_handle.give(&start_cap, Ok(object)))
302            }
303            Err(err) => {
304                tracing::warn!(?err, "failed to list oneshot source");
305                start_handle.give(&start_cap, Err(err))
306            }
307        }
308    });
309
310    (start_stream, shutdown.press_on_drop())
311}
312
313/// Render an operator that given a stream of [`OneshotSource::Object`]s will split them into units
314/// of work based on the provided [`OneshotFormat`].
315pub fn render_split_work<G, S, F>(
316    scope: G,
317    collection_id: GlobalId,
318    objects: StreamVec<G, Result<(S::Object, S::Checksum), StorageErrorX>>,
319    source: S,
320    format: F,
321) -> (
322    StreamVec<G, Result<F::WorkRequest<S>, StorageErrorX>>,
323    PressOnDropButton,
324)
325where
326    G: Scope,
327    S: OneshotSource + Send + Sync + 'static,
328    F: OneshotFormat + Send + Sync + 'static,
329{
330    let worker_id = scope.index();
331    let mut builder = AsyncOperatorBuilder::new("CopyFrom-split_work".to_string(), scope.clone());
332
333    let (request_handle, request_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
334    let mut objects_handle = builder.new_input_for(objects, Distribute, &request_handle);
335
336    let shutdown = builder.build(move |caps| async move {
337        let [_objects_cap] = caps.try_into().unwrap();
338
339        info!(%collection_id, %worker_id, "CopyFrom Split Work");
340
341        while let Some(event) = objects_handle.next().await {
342            let (capability, maybe_objects) = match event {
343                AsyncEvent::Data(cap, req) => (cap, req),
344                AsyncEvent::Progress(_) => continue,
345            };
346
347            // Nest the `split_work(...)` method in an async-block so we can use the `?`
348            // without returning from the entire operator, and so we can add context.
349            let result = async {
350                let mut requests = Vec::new();
351
352                for maybe_object in maybe_objects {
353                    // Return early if the upstream Discover step failed.
354                    let (object, checksum) = maybe_object?;
355
356                    let format_ = format.clone();
357                    let source_ = source.clone();
358                    let work_requests = mz_ore::task::spawn(|| "split-work", async move {
359                        info!(%worker_id, object = %object.name(), "splitting object");
360                        format_.split_work(source_.clone(), object, checksum).await
361                    })
362                    .await?;
363
364                    requests.extend(work_requests);
365                }
366
367                Ok::<_, StorageErrorX>(requests)
368            }
369            .await
370            .context("split");
371
372            match result {
373                Ok(requests) => requests
374                    .into_iter()
375                    .for_each(|req| request_handle.give(&capability, Ok(req))),
376                Err(err) => request_handle.give(&capability, Err(err)),
377            }
378        }
379    });
380
381    (request_stream, shutdown.press_on_drop())
382}
383
384/// Render an operator that given a stream [`OneshotFormat::WorkRequest`]s will fetch chunks of the
385/// remote [`OneshotSource::Object`] and return a stream of [`OneshotFormat::RecordChunk`]s that
386/// can be decoded into [`Row`]s.
387pub fn render_fetch_work<G, S, F>(
388    scope: G,
389    collection_id: GlobalId,
390    source: S,
391    format: F,
392    work_requests: StreamVec<G, Result<F::WorkRequest<S>, StorageErrorX>>,
393) -> (
394    StreamVec<G, Result<F::RecordChunk, StorageErrorX>>,
395    PressOnDropButton,
396)
397where
398    G: Scope,
399    S: OneshotSource + Sync + 'static,
400    F: OneshotFormat + Sync + 'static,
401{
402    let worker_id = scope.index();
403    let mut builder = AsyncOperatorBuilder::new("CopyFrom-fetch_work".to_string(), scope.clone());
404
405    let (record_handle, record_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
406    let mut work_requests_handle = builder.new_input_for(work_requests, Distribute, &record_handle);
407
408    let shutdown = builder.build(move |caps| async move {
409        let [_work_cap] = caps.try_into().unwrap();
410
411        info!(%collection_id, %worker_id, "CopyFrom Fetch Work");
412
413        while let Some(event) = work_requests_handle.next().await {
414            let (capability, maybe_requests) = match event {
415                AsyncEvent::Data(cap, req) => (cap, req),
416                AsyncEvent::Progress(_) => continue,
417            };
418
419            // Wrap our work in a block to capture `?`.
420            let result = async {
421                // Process each stream of work, one at a time.
422                for maybe_request in maybe_requests {
423                    let request = maybe_request?;
424
425                    let mut work_stream = format.fetch_work(&source, request);
426                    while let Some(result) = work_stream.next().await {
427                        // Returns early and stop consuming from the stream if we hit an error.
428                        let record_chunk = result.context("fetch worker")?;
429
430                        // Note: We want to give record chunks as we receive them, because a work
431                        // request may be for an entire file.
432                        //
433                        // TODO(cf3): Investigate if some small batching here would help perf.
434                        record_handle.give(&capability, Ok(record_chunk));
435                    }
436                }
437
438                Ok::<_, StorageErrorX>(())
439            }
440            .await
441            .context("fetch work");
442
443            if let Err(err) = result {
444                tracing::warn!(?err, "failed to fetch");
445                record_handle.give(&capability, Err(err))
446            }
447        }
448    });
449
450    (record_stream, shutdown.press_on_drop())
451}
452
453/// Render an operator that given a stream of [`OneshotFormat::RecordChunk`]s will decode these
454/// chunks into a stream of [`Row`]s.
455pub fn render_decode_chunk<G, F>(
456    scope: G,
457    format: F,
458    record_chunks: StreamVec<G, Result<F::RecordChunk, StorageErrorX>>,
459    mfp: SafeMfpPlan,
460) -> (StreamVec<G, Result<Row, StorageErrorX>>, PressOnDropButton)
461where
462    G: Scope,
463    F: OneshotFormat + 'static,
464{
465    let mut builder = AsyncOperatorBuilder::new("CopyFrom-decode_chunk".to_string(), scope.clone());
466
467    let (row_handle, row_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
468    let mut record_chunk_handle = builder.new_input_for(record_chunks, Distribute, &row_handle);
469
470    let shutdown = builder.build(move |caps| async move {
471        let [_row_cap] = caps.try_into().unwrap();
472
473        let mut datum_vec = DatumVec::default();
474        let row_arena = RowArena::default();
475        let mut row_buf = Row::default();
476
477        while let Some(event) = record_chunk_handle.next().await {
478            let (capability, maybe_chunks) = match event {
479                AsyncEvent::Data(cap, data) => (cap, data),
480                AsyncEvent::Progress(_) => continue,
481            };
482
483            let result = async {
484                let mut rows = Vec::new();
485                for maybe_chunk in maybe_chunks {
486                    let chunk = maybe_chunk?;
487                    format.decode_chunk(chunk, &mut rows)?;
488                }
489                Ok::<_, StorageErrorX>(rows)
490            }
491            .await
492            .context("decode chunk");
493
494            match result {
495                Ok(rows) => {
496                    // For each row of source data, we pass it through an MFP to re-arrange column
497                    // orders and/or fill in default values for missing columns.
498                    for row in rows {
499                        let mut datums = datum_vec.borrow_with(&row);
500                        let result = mfp
501                            .evaluate_into(&mut *datums, &row_arena, &mut row_buf)
502                            .map(|row| row.cloned());
503
504                        match result {
505                            Ok(Some(row)) => row_handle.give(&capability, Ok(row)),
506                            Ok(None) => {
507                                // We only expect the provided MFP to map rows from the source data
508                                // and project default values.
509                                mz_ore::soft_panic_or_log!("oneshot source MFP filtered out data!");
510                            }
511                            Err(e) => {
512                                let err = StorageErrorXKind::MfpEvalError(e.to_string().into())
513                                    .with_context("decode");
514                                row_handle.give(&capability, Err(err))
515                            }
516                        }
517                    }
518                }
519                Err(err) => row_handle.give(&capability, Err(err)),
520            }
521        }
522    });
523
524    (row_stream, shutdown.press_on_drop())
525}
526
527/// Render an operator that given a stream of [`Row`]s will stage them in Persist and return a
528/// stream of [`ProtoBatch`]es that can later be linked into a shard.
529pub fn render_stage_batches_operator<G>(
530    scope: G,
531    collection_id: GlobalId,
532    collection_meta: &CollectionMetadata,
533    persist_clients: Arc<PersistClientCache>,
534    rows_stream: StreamVec<G, Result<Row, StorageErrorX>>,
535) -> (
536    StreamVec<G, Result<ProtoBatch, StorageErrorX>>,
537    PressOnDropButton,
538)
539where
540    G: Scope,
541{
542    let persist_location = collection_meta.persist_location.clone();
543    let shard_id = collection_meta.data_shard;
544    let collection_desc = Arc::new(collection_meta.relation_desc.clone());
545
546    let mut builder =
547        AsyncOperatorBuilder::new("CopyFrom-stage_batches".to_string(), scope.clone());
548
549    let (proto_batch_handle, proto_batch_stream) =
550        builder.new_output::<CapacityContainerBuilder<_>>();
551    let mut rows_handle = builder.new_input_for(rows_stream, Pipeline, &proto_batch_handle);
552
553    let shutdown = builder.build(move |caps| async move {
554        let [proto_batch_cap] = caps.try_into().unwrap();
555
556        // Open a Persist handle that we can use to stage a batch.
557        let persist_client = persist_clients
558            .open(persist_location)
559            .await
560            .expect("failed to open Persist client");
561        let persist_diagnostics = Diagnostics {
562            shard_name: collection_id.to_string(),
563            handle_purpose: "CopyFrom::stage_batches".to_string(),
564        };
565        let write_handle = persist_client
566            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
567                shard_id,
568                Arc::clone(&collection_desc),
569                Arc::new(UnitSchema),
570                persist_diagnostics,
571            )
572            .await
573            .expect("could not open Persist shard");
574
575        // Create a batch using the minimum timestamp since these batches will
576        // get sent back to `environmentd` and their timestamps re-written
577        // before being finally appended.
578        let lower = mz_repr::Timestamp::MIN;
579        let upper = Antichain::from_elem(lower.step_forward());
580
581        let mut batch_builder = write_handle.builder(Antichain::from_elem(lower));
582
583        while let Some(event) = rows_handle.next().await {
584            let AsyncEvent::Data(_, row_batch) = event else {
585                continue;
586            };
587
588            // Pull Rows off our stream and stage them into a Batch.
589            for maybe_row in row_batch {
590                let maybe_row = maybe_row.and_then(|row| {
591                    Row::validate(&row, &*collection_desc).map_err(|e| {
592                        StorageErrorXKind::invalid_record_batch(e).with_context("stage_batches")
593                    })?;
594                    Ok(row)
595                });
596                match maybe_row {
597                    // Happy path, add the Row to our batch!
598                    Ok(row) => {
599                        let data = SourceData(Ok(row));
600                        batch_builder
601                            .add(&data, &(), &lower, &1)
602                            .await
603                            .expect("failed to add Row to batch");
604                    }
605                    // Sad path, something upstream hit an error.
606                    Err(err) => {
607                        // Clean up our in-progress batch so we don't leak data.
608                        let batch = batch_builder
609                            .finish(upper)
610                            .await
611                            .expect("failed to cleanup batch");
612                        batch.delete().await;
613
614                        // Pass on the error.
615                        proto_batch_handle
616                            .give(&proto_batch_cap, Err(err).context("stage batches"));
617                        return;
618                    }
619                }
620            }
621        }
622
623        let batch = batch_builder
624            .finish(upper)
625            .await
626            .expect("failed to create Batch");
627
628        // Turn our Batch into a ProtoBatch that will later be linked in to
629        // the shard.
630        //
631        // Note: By turning this into a ProtoBatch, the onus is now on us to
632        // cleanup the Batch if it's never linked into the shard.
633        //
634        // TODO(cf2): Make sure these batches get cleaned up if another
635        // worker encounters an error.
636        let proto_batch = batch.into_transmittable_batch();
637        proto_batch_handle.give(&proto_batch_cap, Ok(proto_batch));
638    });
639
640    (proto_batch_stream, shutdown.press_on_drop())
641}
642
643/// Render an operator that given a stream of [`ProtoBatch`]es will call our `worker_callback` to
644/// report the results upstream.
645pub fn render_completion_operator<G, F>(
646    scope: G,
647    results_stream: StreamVec<G, Result<ProtoBatch, StorageErrorX>>,
648    worker_callback: F,
649) where
650    G: Scope,
651    F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
652{
653    let mut builder = AsyncOperatorBuilder::new("CopyFrom-completion".to_string(), scope.clone());
654    let mut results_input = builder.new_disconnected_input(results_stream, Pipeline);
655
656    builder.build(move |_| async move {
657        let result = async move {
658            let mut maybe_payload: Option<ProtoBatch> = None;
659
660            while let Some(event) = results_input.next().await {
661                if let AsyncEvent::Data(_cap, results) = event {
662                    let [result] = results
663                        .try_into()
664                        .expect("only 1 event on the result stream");
665
666                    // TODO(cf2): Lift this restriction.
667                    if maybe_payload.is_some() {
668                        panic!("expected only one batch!");
669                    }
670
671                    maybe_payload = Some(result.map_err(|e| e.to_string())?);
672                }
673            }
674
675            Ok(maybe_payload)
676        }
677        .await;
678
679        // Report to the caller of our final status.
680        worker_callback(result);
681    });
682}
683
684/// An object that will be fetched from a [`OneshotSource`].
685pub trait OneshotObject {
686    /// Name of the object, including any extensions.
687    fn name(&self) -> &str;
688
689    /// Path of the object within the remote source.
690    fn path(&self) -> &str;
691
692    /// Size of this object in bytes.
693    fn size(&self) -> usize;
694
695    /// Encodings of the _entire_ object, if any.
696    ///
697    /// Note: The object may internally use compression, e.g. a Parquet file
698    /// could compress its column chunks, but if the Parquet file itself is not
699    /// compressed then this would return `None`.
700    fn encodings(&self) -> &[Encoding];
701}
702
703/// Encoding of a [`OneshotObject`].
704#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
705pub enum Encoding {
706    Bzip2,
707    Gzip,
708    Xz,
709    Zstd,
710}
711
712/// Defines a remote system that we can fetch data from for a "one time" ingestion.
713pub trait OneshotSource: Clone + Send + Unpin {
714    /// An individual unit within the source, e.g. a file.
715    type Object: OneshotObject
716        + Debug
717        + Clone
718        + Send
719        + Unpin
720        + Serialize
721        + DeserializeOwned
722        + 'static;
723    /// Checksum for a [`Self::Object`].
724    type Checksum: Debug + Clone + Send + Unpin + Serialize + DeserializeOwned + 'static;
725
726    /// Returns all of the objects for this source.
727    fn list<'a>(
728        &'a self,
729    ) -> impl Future<Output = Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX>> + Send;
730
731    /// Resturns a stream of the data for a specific object.
732    fn get<'s>(
733        &'s self,
734        object: Self::Object,
735        checksum: Self::Checksum,
736        range: Option<std::ops::RangeInclusive<usize>>,
737    ) -> BoxStream<'s, Result<Bytes, StorageErrorX>>;
738}
739
740/// An enum wrapper around [`OneshotSource`]s.
741///
742/// An alternative to this wrapper would be to use `Box<dyn OneshotSource>`, but that requires
743/// making the trait object safe and it's easier to just wrap it in an enum. Also, this wrapper
744/// provides a convenient place to add [`StorageErrorXContext::context`] for all of our source
745/// types.
746#[derive(Clone, Debug)]
747pub(crate) enum SourceKind {
748    Http(HttpOneshotSource),
749    AwsS3(AwsS3Source),
750}
751
752impl OneshotSource for SourceKind {
753    type Object = ObjectKind;
754    type Checksum = ChecksumKind;
755
756    async fn list<'a>(&'a self) -> Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX> {
757        match self {
758            SourceKind::Http(http) => {
759                let objects = http.list().await.context("http")?;
760                let objects = objects
761                    .into_iter()
762                    .map(|(object, checksum)| {
763                        (ObjectKind::Http(object), ChecksumKind::Http(checksum))
764                    })
765                    .collect();
766                Ok(objects)
767            }
768            SourceKind::AwsS3(s3) => {
769                let objects = s3.list().await.context("s3")?;
770                let objects = objects
771                    .into_iter()
772                    .map(|(object, checksum)| {
773                        (ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum))
774                    })
775                    .collect();
776                Ok(objects)
777            }
778        }
779    }
780
781    fn get<'s>(
782        &'s self,
783        object: Self::Object,
784        checksum: Self::Checksum,
785        range: Option<std::ops::RangeInclusive<usize>>,
786    ) -> BoxStream<'s, Result<Bytes, StorageErrorX>> {
787        match (self, object, checksum) {
788            (SourceKind::Http(http), ObjectKind::Http(object), ChecksumKind::Http(checksum)) => {
789                http.get(object, checksum, range)
790                    .map(|result| result.context("http"))
791                    .boxed()
792            }
793            (SourceKind::AwsS3(s3), ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum)) => s3
794                .get(object, checksum, range)
795                .map(|result| result.context("aws_s3"))
796                .boxed(),
797            (SourceKind::AwsS3(_) | SourceKind::Http(_), _, _) => {
798                unreachable!("programming error! wrong source, object, and checksum kind");
799            }
800        }
801    }
802}
803
804/// Enum wrapper for [`OneshotSource::Object`], see [`SourceKind`] for more details.
805#[derive(Debug, Clone, Serialize, Deserialize)]
806pub(crate) enum ObjectKind {
807    Http(HttpObject),
808    AwsS3(S3Object),
809}
810
811impl OneshotObject for ObjectKind {
812    fn name(&self) -> &str {
813        match self {
814            ObjectKind::Http(object) => object.name(),
815            ObjectKind::AwsS3(object) => object.name(),
816        }
817    }
818
819    fn path(&self) -> &str {
820        match self {
821            ObjectKind::Http(object) => object.path(),
822            ObjectKind::AwsS3(object) => object.path(),
823        }
824    }
825
826    fn size(&self) -> usize {
827        match self {
828            ObjectKind::Http(object) => object.size(),
829            ObjectKind::AwsS3(object) => object.size(),
830        }
831    }
832
833    fn encodings(&self) -> &[Encoding] {
834        match self {
835            ObjectKind::Http(object) => object.encodings(),
836            ObjectKind::AwsS3(object) => object.encodings(),
837        }
838    }
839}
840
841/// Enum wrapper for [`OneshotSource::Checksum`], see [`SourceKind`] for more details.
842#[derive(Debug, Clone, Serialize, Deserialize)]
843pub(crate) enum ChecksumKind {
844    Http(HttpChecksum),
845    AwsS3(S3Checksum),
846}
847
848/// Defines a format that we fetch for a "one time" ingestion.
849pub trait OneshotFormat: Clone {
850    /// A single unit of work for decoding this format, e.g. a single Parquet RowGroup.
851    type WorkRequest<S>: Debug + Clone + Send + Serialize + DeserializeOwned + 'static
852    where
853        S: OneshotSource;
854    /// A chunk of records in this format that can be decoded into Rows.
855    type RecordChunk: Debug + Clone + Send + Serialize + DeserializeOwned + 'static;
856
857    /// Given an upstream object, defines how we should parse this object in parallel.
858    ///
859    /// Note: It's totally fine to not process an object in parallel, and just return a single
860    /// [`Self::WorkRequest`] here.
861    fn split_work<S: OneshotSource + Send>(
862        &self,
863        source: S,
864        object: S::Object,
865        checksum: S::Checksum,
866    ) -> impl Future<Output = Result<Vec<Self::WorkRequest<S>>, StorageErrorX>> + Send;
867
868    /// Given a work request, fetch data from the [`OneshotSource`] and return it in a format that
869    /// can later be decoded.
870    fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
871        &'a self,
872        source: &'a S,
873        request: Self::WorkRequest<S>,
874    ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>>;
875
876    /// Decode a chunk of records into [`Row`]s.
877    fn decode_chunk(
878        &self,
879        chunk: Self::RecordChunk,
880        rows: &mut Vec<Row>,
881    ) -> Result<usize, StorageErrorX>;
882}
883
884/// An enum wrapper around [`OneshotFormat`]s.
885///
886/// An alternative to this wrapper would be to use `Box<dyn OneshotFormat>`, but that requires
887/// making the trait object safe and it's easier to just wrap it in an enum. Also, this wrapper
888/// provides a convenient place to add [`StorageErrorXContext::context`] for all of our format
889/// types.
890#[derive(Clone, Debug)]
891pub(crate) enum FormatKind {
892    Csv(CsvDecoder),
893    Parquet(ParquetFormat),
894}
895
896impl OneshotFormat for FormatKind {
897    type WorkRequest<S>
898        = RequestKind<S::Object, S::Checksum>
899    where
900        S: OneshotSource;
901    type RecordChunk = RecordChunkKind;
902
903    async fn split_work<S: OneshotSource + Send>(
904        &self,
905        source: S,
906        object: S::Object,
907        checksum: S::Checksum,
908    ) -> Result<Vec<Self::WorkRequest<S>>, StorageErrorX> {
909        match self {
910            FormatKind::Csv(csv) => {
911                let work = csv
912                    .split_work(source, object, checksum)
913                    .await
914                    .context("csv")?
915                    .into_iter()
916                    .map(RequestKind::Csv)
917                    .collect();
918                Ok(work)
919            }
920            FormatKind::Parquet(parquet) => {
921                let work = parquet
922                    .split_work(source, object, checksum)
923                    .await
924                    .context("parquet")?
925                    .into_iter()
926                    .map(RequestKind::Parquet)
927                    .collect();
928                Ok(work)
929            }
930        }
931    }
932
933    fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
934        &'a self,
935        source: &'a S,
936        request: Self::WorkRequest<S>,
937    ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>> {
938        match (self, request) {
939            (FormatKind::Csv(csv), RequestKind::Csv(request)) => csv
940                .fetch_work(source, request)
941                .map_ok(RecordChunkKind::Csv)
942                .map(|result| result.context("csv"))
943                .boxed(),
944            (FormatKind::Parquet(parquet), RequestKind::Parquet(request)) => parquet
945                .fetch_work(source, request)
946                .map_ok(RecordChunkKind::Parquet)
947                .map(|result| result.context("parquet"))
948                .boxed(),
949            (FormatKind::Parquet(_), RequestKind::Csv(_))
950            | (FormatKind::Csv(_), RequestKind::Parquet(_)) => {
951                unreachable!("programming error, {self:?}")
952            }
953        }
954    }
955
956    fn decode_chunk(
957        &self,
958        chunk: Self::RecordChunk,
959        rows: &mut Vec<Row>,
960    ) -> Result<usize, StorageErrorX> {
961        match (self, chunk) {
962            (FormatKind::Csv(csv), RecordChunkKind::Csv(chunk)) => {
963                csv.decode_chunk(chunk, rows).context("csv")
964            }
965            (FormatKind::Parquet(parquet), RecordChunkKind::Parquet(chunk)) => {
966                parquet.decode_chunk(chunk, rows).context("parquet")
967            }
968            (FormatKind::Parquet(_), RecordChunkKind::Csv(_))
969            | (FormatKind::Csv(_), RecordChunkKind::Parquet(_)) => {
970                unreachable!("programming error, {self:?}")
971            }
972        }
973    }
974}
975
976#[derive(Clone, Debug, Serialize, Deserialize)]
977pub(crate) enum RequestKind<O, C> {
978    Csv(CsvWorkRequest<O, C>),
979    Parquet(ParquetWorkRequest<O, C>),
980}
981
982#[derive(Clone, Debug, Serialize, Deserialize)]
983pub(crate) enum RecordChunkKind {
984    Csv(CsvRecord),
985    Parquet(ParquetRowGroup),
986}
987
988pub(crate) enum ObjectFilter {
989    None,
990    Files(BTreeSet<Box<str>>),
991    Pattern(glob::Pattern),
992}
993
994impl ObjectFilter {
995    pub fn try_new(filter: ContentFilter) -> Result<Self, anyhow::Error> {
996        match filter {
997            ContentFilter::None => Ok(ObjectFilter::None),
998            ContentFilter::Files(files) => {
999                let files = files.into_iter().map(|f| f.into()).collect();
1000                Ok(ObjectFilter::Files(files))
1001            }
1002            ContentFilter::Pattern(pattern) => {
1003                let pattern = glob::Pattern::new(&pattern)?;
1004                Ok(ObjectFilter::Pattern(pattern))
1005            }
1006        }
1007    }
1008
1009    /// Returns if the object should be included.
1010    pub fn filter<S: OneshotSource>(&self, object: &S::Object) -> bool {
1011        match self {
1012            ObjectFilter::None => true,
1013            ObjectFilter::Files(files) => files.contains(object.path()),
1014            ObjectFilter::Pattern(pattern) => pattern.matches(object.path()),
1015        }
1016    }
1017}
1018
1019/// Experimental Error Type.
1020///
1021/// The goal of this type is to combine concepts from both `thiserror` and
1022/// `anyhow`. Having "stongly typed" errors from `thiserror` is useful for
1023/// determining what action to take and tracking the context of an error like
1024/// `anyhow` is useful for determining where an error came from.
1025#[derive(Debug, Clone, Deserialize, Serialize)]
1026pub struct StorageErrorX {
1027    kind: StorageErrorXKind,
1028    context: LinkedList<String>,
1029}
1030
1031impl fmt::Display for StorageErrorX {
1032    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1033        writeln!(f, "error: {}", self.kind)?;
1034        writeln!(f, "causes: {:?}", self.context)?;
1035        Ok(())
1036    }
1037}
1038
1039/// Experimental Error Type, see [`StorageErrorX`].
1040#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
1041pub enum StorageErrorXKind {
1042    #[error("csv decoding error: {0}")]
1043    CsvDecoding(Arc<str>),
1044    #[error("parquet error: {0}")]
1045    ParquetError(Arc<str>),
1046    #[error("reqwest error: {0}")]
1047    Reqwest(Arc<str>),
1048    #[error("aws s3 request error: {0}")]
1049    AwsS3Request(String),
1050    #[error("aws s3 bytestream error: {0}")]
1051    AwsS3Bytes(Arc<str>),
1052    #[error("invalid reqwest header: {0}")]
1053    InvalidHeader(Arc<str>),
1054    #[error("failed to decode Row from a record batch: {0}")]
1055    InvalidRecordBatch(Arc<str>),
1056    #[error("programming error: {0}")]
1057    ProgrammingError(Arc<str>),
1058    #[error("failed to get the size of an object")]
1059    MissingSize,
1060    #[error("object is missing the required '{0}' field")]
1061    MissingField(Arc<str>),
1062    #[error("failed while evaluating the provided mfp: '{0}'")]
1063    MfpEvalError(Arc<str>),
1064    #[error("no matching files found at the given url")]
1065    NoMatchingFiles,
1066    #[error("something went wrong: {0}")]
1067    Generic(String),
1068}
1069
1070impl From<csv_async::Error> for StorageErrorXKind {
1071    fn from(err: csv_async::Error) -> Self {
1072        StorageErrorXKind::CsvDecoding(err.to_string().into())
1073    }
1074}
1075
1076impl From<reqwest::Error> for StorageErrorXKind {
1077    fn from(err: reqwest::Error) -> Self {
1078        StorageErrorXKind::Reqwest(err.to_string().into())
1079    }
1080}
1081
1082impl From<reqwest::header::ToStrError> for StorageErrorXKind {
1083    fn from(err: reqwest::header::ToStrError) -> Self {
1084        StorageErrorXKind::InvalidHeader(err.to_string().into())
1085    }
1086}
1087
1088impl From<aws_smithy_types::byte_stream::error::Error> for StorageErrorXKind {
1089    fn from(err: aws_smithy_types::byte_stream::error::Error) -> Self {
1090        StorageErrorXKind::AwsS3Bytes(DisplayErrorContext(err).to_string().into())
1091    }
1092}
1093
1094impl From<::parquet::errors::ParquetError> for StorageErrorXKind {
1095    fn from(err: ::parquet::errors::ParquetError) -> Self {
1096        StorageErrorXKind::ParquetError(err.to_string().into())
1097    }
1098}
1099
1100impl StorageErrorXKind {
1101    pub fn with_context<C: Display>(self, context: C) -> StorageErrorX {
1102        StorageErrorX {
1103            kind: self,
1104            context: LinkedList::from([context.to_string()]),
1105        }
1106    }
1107
1108    pub fn invalid_record_batch<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1109        StorageErrorXKind::InvalidRecordBatch(error.into())
1110    }
1111
1112    pub fn generic<C: Display>(error: C) -> StorageErrorXKind {
1113        StorageErrorXKind::Generic(error.to_string())
1114    }
1115
1116    pub fn programming_error<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1117        StorageErrorXKind::ProgrammingError(error.into())
1118    }
1119}
1120
1121impl<E> From<E> for StorageErrorX
1122where
1123    E: Into<StorageErrorXKind>,
1124{
1125    fn from(err: E) -> Self {
1126        StorageErrorX {
1127            kind: err.into(),
1128            context: LinkedList::new(),
1129        }
1130    }
1131}
1132
1133trait StorageErrorXContext<T> {
1134    fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1135    where
1136        C: Display;
1137}
1138
1139impl<T, E> StorageErrorXContext<T> for Result<T, E>
1140where
1141    E: Into<StorageErrorXKind>,
1142{
1143    fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1144    where
1145        C: Display,
1146    {
1147        match self {
1148            Ok(val) => Ok(val),
1149            Err(kind) => Err(StorageErrorX {
1150                kind: kind.into(),
1151                context: LinkedList::from([context.to_string()]),
1152            }),
1153        }
1154    }
1155}
1156
1157impl<T> StorageErrorXContext<T> for Result<T, StorageErrorX> {
1158    fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1159    where
1160        C: Display,
1161    {
1162        match self {
1163            Ok(val) => Ok(val),
1164            Err(mut e) => {
1165                e.context.push_back(context.to_string());
1166                Err(e)
1167            }
1168        }
1169    }
1170}