Skip to main content

mz_storage_operators/
oneshot_source.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! "Oneshot" sources are a one-time ingestion of data from an external system, unlike traditional
11//! sources, they __do not__ run continuously. Oneshot sources are generally used for `COPY FROM`
12//! SQL statements.
13//!
14//! The implementation of reading and parsing data is behind the [`OneshotSource`] and
15//! [`OneshotFormat`] traits, respectively. Users looking to add new sources or formats, should
16//! only need to add new implementations for these traits.
17//!
18//! * [`OneshotSource`] is an interface for listing and reading from an external system, e.g. an
19//!   HTTP server.
20//! * [`OneshotFormat`] is an interface for how to parallelize and parse data, e.g. CSV.
21//!
22//! Given a [`OneshotSource`] and a [`OneshotFormat`] we build a dataflow structured like the
23//! following:
24//!
25//! ```text
26//!             ┏━━━━━━━━━━━━━━━┓
27//!             ┃    Discover   ┃
28//!             ┃    objects    ┃
29//!             ┗━━━━━━━┯━━━━━━━┛
30//!           ┌───< Distribute >───┐
31//!           │                    │
32//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
33//!     ┃  Split   ┃   ...   ┃  Split   ┃
34//!     ┃  Work 1  ┃         ┃  Work n  ┃
35//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
36//!           │                    │
37//!           ├───< Distribute >───┤
38//!           │                    │
39//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
40//!     ┃  Fetch   ┃   ...   ┃  Fetch   ┃
41//!     ┃  Work 1  ┃         ┃  Work n  ┃
42//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
43//!           │                    │
44//!           ├───< Distribute >───┤
45//!           │                    │
46//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
47//!     ┃  Decode  ┃   ...   ┃  Decode  ┃
48//!     ┃  Chunk 1 ┃         ┃  Chunk n ┃
49//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
50//!           │                    │
51//!           │                    │
52//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
53//!     ┃  Stage   ┃   ...   ┃  Stage   ┃
54//!     ┃  Batch 1 ┃         ┃  Batch n ┃
55//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
56//!           │                    │
57//!           └─────────┬──────────┘
58//!               ┏━━━━━v━━━━┓
59//!               ┃  Result  ┃
60//!               ┃ Callback ┃
61//!               ┗━━━━━━━━━━┛
62//! ```
63//!
64
65use std::fmt;
66use std::sync::Arc;
67
68use aws_sdk_s3::error::DisplayErrorContext;
69use bytes::Bytes;
70use differential_dataflow::Hashable;
71use futures::stream::BoxStream;
72use futures::{StreamExt, TryStreamExt};
73use mz_expr::SafeMfpPlan;
74use mz_ore::cast::CastFrom;
75use mz_persist_client::Diagnostics;
76use mz_persist_client::batch::ProtoBatch;
77use mz_persist_client::cache::PersistClientCache;
78use mz_persist_types::Codec;
79use mz_persist_types::codec_impls::UnitSchema;
80use mz_repr::{DatumVec, GlobalId, Row, RowArena, Timestamp};
81use mz_storage_types::StorageDiff;
82use mz_storage_types::connections::ConnectionContext;
83use mz_storage_types::controller::CollectionMetadata;
84use mz_storage_types::oneshot_sources::{
85    ContentFilter, ContentFormat, ContentSource, OneshotIngestionRequest,
86};
87use mz_storage_types::sources::SourceData;
88use mz_timely_util::builder_async::{
89    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
90};
91use mz_timely_util::pact::Distribute;
92use serde::de::DeserializeOwned;
93use serde::{Deserialize, Serialize};
94use std::collections::{BTreeSet, LinkedList};
95use std::fmt::{Debug, Display};
96use std::future::Future;
97use timely::container::CapacityContainerBuilder;
98use timely::dataflow::channels::pact::Pipeline;
99use timely::dataflow::{Scope, StreamVec};
100use timely::progress::Antichain;
101use tracing::info;
102
103use crate::oneshot_source::aws_source::{AwsS3Source, S3Checksum, S3Object};
104use crate::oneshot_source::csv::{CsvDecoder, CsvRecord, CsvWorkRequest};
105use crate::oneshot_source::http_source::{HttpChecksum, HttpObject, HttpOneshotSource};
106use crate::oneshot_source::parquet::{ParquetFormat, ParquetRowGroup, ParquetWorkRequest};
107
108pub mod csv;
109pub mod parquet;
110
111pub mod aws_source;
112pub mod http_source;
113
114mod util;
115
116/// Render a dataflow to do a "oneshot" ingestion.
117///
118/// Roughly the operators we render do the following:
119///
120/// 1. Discover objects with a [`OneshotSource`].
121/// 2. Split objects into separate units of work based on the [`OneshotFormat`].
122/// 3. Fetch individual units of work (aka fetch byte blobs) with the
123///    [`OneshotFormat`] and [`OneshotSource`].
124/// 4. Decode the fetched byte blobs into [`Row`]s.
125/// 5. Stage the [`Row`]s into Persist returning [`ProtoBatch`]es.
126///
127/// TODO(cf3): Benchmark combining operators 3, 4, and 5. Currently we keep them
128/// separate for the [`CsvDecoder`]. CSV decoding is hard to do in parallel so we
129/// currently have a single worker Fetch an entire file, and then distributes
130/// chunks for parallel Decoding. We should benchmark if this is actually faster
131/// than just a single worker both fetching and decoding.
132///
133pub fn render<'scope, F>(
134    scope: Scope<'scope, Timestamp>,
135    persist_clients: Arc<PersistClientCache>,
136    connection_context: ConnectionContext,
137    collection_id: GlobalId,
138    collection_meta: CollectionMetadata,
139    request: OneshotIngestionRequest,
140    worker_callback: F,
141) -> Vec<PressOnDropButton>
142where
143    F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
144{
145    let OneshotIngestionRequest {
146        source,
147        format,
148        filter,
149        shape,
150    } = request;
151
152    let source = match source {
153        ContentSource::Http { url } => {
154            let source = HttpOneshotSource::new(reqwest::Client::default(), url);
155            SourceKind::Http(source)
156        }
157        ContentSource::AwsS3 {
158            connection,
159            connection_id,
160            uri,
161        } => {
162            // Checksum validation does not work with GCS when using ranges, which happens with parquet.
163            // So, we disable checksum if both the endpoint is overridden to a non-AWS endpoint and the format is Parquet.
164            let use_checksum = !(connection.endpoint.is_some()
165                && !connection.endpoint.as_ref().unwrap().contains("amazonaws"))
166                || format != ContentFormat::Parquet;
167            if !use_checksum {
168                tracing::info!(
169                    "disabling checksum validation for S3 source because endpoint: {:?} is overridden and format is Parquet",
170                    connection.endpoint
171                );
172            }
173            let source = AwsS3Source::new(
174                connection,
175                connection_id,
176                connection_context,
177                uri,
178                use_checksum,
179            );
180            SourceKind::AwsS3(source)
181        }
182    };
183    tracing::info!(?source, "created oneshot source");
184
185    let format = match format {
186        ContentFormat::Csv(params) => {
187            let format = CsvDecoder::new(params, &shape.source_desc);
188            FormatKind::Csv(format)
189        }
190        ContentFormat::Parquet => {
191            let format = ParquetFormat::new(shape.source_desc);
192            FormatKind::Parquet(format)
193        }
194    };
195
196    // Discover what objects are available to copy.
197    let (objects_stream, discover_token) =
198        render_discover_objects(scope.clone(), collection_id, source.clone(), filter);
199    // Split the objects into individual units of work.
200    let (work_stream, split_token) = render_split_work(
201        scope.clone(),
202        collection_id,
203        objects_stream,
204        source.clone(),
205        format.clone(),
206    );
207    // Fetch each unit of work, returning chunks of records.
208    let (records_stream, fetch_token) = render_fetch_work(
209        scope.clone(),
210        collection_id,
211        source.clone(),
212        format.clone(),
213        work_stream,
214    );
215    // Parse chunks of records into Rows.
216    let (rows_stream, decode_token) = render_decode_chunk(
217        scope.clone(),
218        format.clone(),
219        records_stream,
220        shape.source_mfp,
221    );
222    // Stage the Rows in Persist.
223    let (batch_stream, batch_token) = render_stage_batches_operator(
224        scope.clone(),
225        collection_id,
226        &collection_meta,
227        persist_clients,
228        rows_stream,
229    );
230
231    // Collect all results together and notify the upstream of whether or not we succeeded.
232    render_completion_operator(scope, batch_stream, worker_callback);
233
234    let tokens = vec![
235        discover_token,
236        split_token,
237        fetch_token,
238        decode_token,
239        batch_token,
240    ];
241
242    tokens
243}
244
245/// Render an operator that using a [`OneshotSource`] will discover what objects are available
246/// for fetching.
247pub fn render_discover_objects<'scope, S>(
248    scope: Scope<'scope, Timestamp>,
249    collection_id: GlobalId,
250    source: S,
251    filter: ContentFilter,
252) -> (
253    StreamVec<'scope, Timestamp, Result<(S::Object, S::Checksum), StorageErrorX>>,
254    PressOnDropButton,
255)
256where
257    S: OneshotSource + 'static,
258{
259    // Only a single worker is responsible for discovering objects.
260    let worker_id = scope.index();
261    let num_workers = scope.peers();
262    let active_worker_id = usize::cast_from((collection_id, "discover").hashed()) % num_workers;
263    let is_active_worker = worker_id == active_worker_id;
264
265    let mut builder = AsyncOperatorBuilder::new("CopyFrom-discover".to_string(), scope.clone());
266
267    let (start_handle, start_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
268
269    let shutdown = builder.build(move |caps| async move {
270        let [start_cap] = caps.try_into().unwrap();
271
272        if !is_active_worker {
273            return;
274        }
275
276        let filter = match ObjectFilter::try_new(filter) {
277            Ok(filter) => filter,
278            Err(err) => {
279                tracing::warn!(?err, "failed to create filter");
280                start_handle.give(&start_cap, Err(StorageErrorXKind::generic(err).into()));
281                return;
282            }
283        };
284
285        let work = source.list().await.context("list");
286        match work {
287            Ok(objects) => {
288                let (include, exclude): (Vec<_>, Vec<_>) = objects
289                    .into_iter()
290                    .partition(|(o, _checksum)| filter.filter::<S>(o));
291                tracing::info!(%worker_id, ?include, ?exclude, "listed objects");
292                if include.is_empty() {
293                    let err = StorageErrorXKind::NoMatchingFiles.with_context("discover");
294                    start_handle.give(&start_cap, Err(err));
295                    return;
296                }
297                include
298                    .into_iter()
299                    .for_each(|object| start_handle.give(&start_cap, Ok(object)))
300            }
301            Err(err) => {
302                tracing::warn!(?err, "failed to list oneshot source");
303                start_handle.give(&start_cap, Err(err))
304            }
305        }
306    });
307
308    (start_stream, shutdown.press_on_drop())
309}
310
311/// Render an operator that given a stream of [`OneshotSource::Object`]s will split them into units
312/// of work based on the provided [`OneshotFormat`].
313pub fn render_split_work<'scope, T, S, F>(
314    scope: Scope<'scope, T>,
315    collection_id: GlobalId,
316    objects: StreamVec<'scope, T, Result<(S::Object, S::Checksum), StorageErrorX>>,
317    source: S,
318    format: F,
319) -> (
320    StreamVec<'scope, T, Result<F::WorkRequest<S>, StorageErrorX>>,
321    PressOnDropButton,
322)
323where
324    T: timely::progress::Timestamp,
325    S: OneshotSource + Send + Sync + 'static,
326    F: OneshotFormat + Send + Sync + 'static,
327{
328    let worker_id = scope.index();
329    let mut builder = AsyncOperatorBuilder::new("CopyFrom-split_work".to_string(), scope.clone());
330
331    let (request_handle, request_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
332    let mut objects_handle = builder.new_input_for(objects, Distribute, &request_handle);
333
334    let shutdown = builder.build(move |caps| async move {
335        let [_objects_cap] = caps.try_into().unwrap();
336
337        info!(%collection_id, %worker_id, "CopyFrom Split Work");
338
339        while let Some(event) = objects_handle.next().await {
340            let (capability, maybe_objects) = match event {
341                AsyncEvent::Data(cap, req) => (cap, req),
342                AsyncEvent::Progress(_) => continue,
343            };
344
345            // Nest the `split_work(...)` method in an async-block so we can use the `?`
346            // without returning from the entire operator, and so we can add context.
347            let result = async {
348                let mut requests = Vec::new();
349
350                for maybe_object in maybe_objects {
351                    // Return early if the upstream Discover step failed.
352                    let (object, checksum) = maybe_object?;
353
354                    let format_ = format.clone();
355                    let source_ = source.clone();
356                    let work_requests = mz_ore::task::spawn(|| "split-work", async move {
357                        info!(%worker_id, object = %object.name(), "splitting object");
358                        format_.split_work(source_.clone(), object, checksum).await
359                    })
360                    .await?;
361
362                    requests.extend(work_requests);
363                }
364
365                Ok::<_, StorageErrorX>(requests)
366            }
367            .await
368            .context("split");
369
370            match result {
371                Ok(requests) => requests
372                    .into_iter()
373                    .for_each(|req| request_handle.give(&capability, Ok(req))),
374                Err(err) => request_handle.give(&capability, Err(err)),
375            }
376        }
377    });
378
379    (request_stream, shutdown.press_on_drop())
380}
381
382/// Render an operator that given a stream [`OneshotFormat::WorkRequest`]s will fetch chunks of the
383/// remote [`OneshotSource::Object`] and return a stream of [`OneshotFormat::RecordChunk`]s that
384/// can be decoded into [`Row`]s.
385pub fn render_fetch_work<'scope, T, S, F>(
386    scope: Scope<'scope, T>,
387    collection_id: GlobalId,
388    source: S,
389    format: F,
390    work_requests: StreamVec<'scope, T, Result<F::WorkRequest<S>, StorageErrorX>>,
391) -> (
392    StreamVec<'scope, T, Result<F::RecordChunk, StorageErrorX>>,
393    PressOnDropButton,
394)
395where
396    T: timely::progress::Timestamp,
397    S: OneshotSource + Sync + 'static,
398    F: OneshotFormat + Sync + 'static,
399{
400    let worker_id = scope.index();
401    let mut builder = AsyncOperatorBuilder::new("CopyFrom-fetch_work".to_string(), scope.clone());
402
403    let (record_handle, record_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
404    let mut work_requests_handle = builder.new_input_for(work_requests, Distribute, &record_handle);
405
406    let shutdown = builder.build(move |caps| async move {
407        let [_work_cap] = caps.try_into().unwrap();
408
409        info!(%collection_id, %worker_id, "CopyFrom Fetch Work");
410
411        while let Some(event) = work_requests_handle.next().await {
412            let (capability, maybe_requests) = match event {
413                AsyncEvent::Data(cap, req) => (cap, req),
414                AsyncEvent::Progress(_) => continue,
415            };
416
417            // Wrap our work in a block to capture `?`.
418            let result = async {
419                // Process each stream of work, one at a time.
420                for maybe_request in maybe_requests {
421                    let request = maybe_request?;
422
423                    let mut work_stream = format.fetch_work(&source, request);
424                    while let Some(result) = work_stream.next().await {
425                        // Returns early and stop consuming from the stream if we hit an error.
426                        let record_chunk = result.context("fetch worker")?;
427
428                        // Note: We want to give record chunks as we receive them, because a work
429                        // request may be for an entire file.
430                        //
431                        // TODO(cf3): Investigate if some small batching here would help perf.
432                        record_handle.give(&capability, Ok(record_chunk));
433                    }
434                }
435
436                Ok::<_, StorageErrorX>(())
437            }
438            .await
439            .context("fetch work");
440
441            if let Err(err) = result {
442                tracing::warn!(?err, "failed to fetch");
443                record_handle.give(&capability, Err(err))
444            }
445        }
446    });
447
448    (record_stream, shutdown.press_on_drop())
449}
450
451/// Render an operator that given a stream of [`OneshotFormat::RecordChunk`]s will decode these
452/// chunks into a stream of [`Row`]s.
453pub fn render_decode_chunk<'scope, T, F>(
454    scope: Scope<'scope, T>,
455    format: F,
456    record_chunks: StreamVec<'scope, T, Result<F::RecordChunk, StorageErrorX>>,
457    mfp: SafeMfpPlan,
458) -> (
459    StreamVec<'scope, T, Result<Row, StorageErrorX>>,
460    PressOnDropButton,
461)
462where
463    T: timely::progress::Timestamp,
464    F: OneshotFormat + 'static,
465{
466    let mut builder = AsyncOperatorBuilder::new("CopyFrom-decode_chunk".to_string(), scope.clone());
467
468    let (row_handle, row_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
469    let mut record_chunk_handle = builder.new_input_for(record_chunks, Distribute, &row_handle);
470
471    let shutdown = builder.build(move |caps| async move {
472        let [_row_cap] = caps.try_into().unwrap();
473
474        let mut datum_vec = DatumVec::default();
475        let row_arena = RowArena::default();
476        let mut row_buf = Row::default();
477
478        while let Some(event) = record_chunk_handle.next().await {
479            let (capability, maybe_chunks) = match event {
480                AsyncEvent::Data(cap, data) => (cap, data),
481                AsyncEvent::Progress(_) => continue,
482            };
483
484            let result = async {
485                let mut rows = Vec::new();
486                for maybe_chunk in maybe_chunks {
487                    let chunk = maybe_chunk?;
488                    format.decode_chunk(chunk, &mut rows)?;
489                }
490                Ok::<_, StorageErrorX>(rows)
491            }
492            .await
493            .context("decode chunk");
494
495            match result {
496                Ok(rows) => {
497                    // For each row of source data, we pass it through an MFP to re-arrange column
498                    // orders and/or fill in default values for missing columns.
499                    for row in rows {
500                        let mut datums = datum_vec.borrow_with(&row);
501                        let result = mfp
502                            .evaluate_into(&mut *datums, &row_arena, &mut row_buf)
503                            .map(|row| row.cloned());
504
505                        match result {
506                            Ok(Some(row)) => row_handle.give(&capability, Ok(row)),
507                            Ok(None) => {
508                                // We only expect the provided MFP to map rows from the source data
509                                // and project default values.
510                                mz_ore::soft_panic_or_log!("oneshot source MFP filtered out data!");
511                            }
512                            Err(e) => {
513                                let err = StorageErrorXKind::MfpEvalError(e.to_string().into())
514                                    .with_context("decode");
515                                row_handle.give(&capability, Err(err))
516                            }
517                        }
518                    }
519                }
520                Err(err) => row_handle.give(&capability, Err(err)),
521            }
522        }
523    });
524
525    (row_stream, shutdown.press_on_drop())
526}
527
528/// Render an operator that given a stream of [`Row`]s will stage them in Persist and return a
529/// stream of [`ProtoBatch`]es that can later be linked into a shard.
530pub fn render_stage_batches_operator<'scope, T>(
531    scope: Scope<'scope, T>,
532    collection_id: GlobalId,
533    collection_meta: &CollectionMetadata,
534    persist_clients: Arc<PersistClientCache>,
535    rows_stream: StreamVec<'scope, T, Result<Row, StorageErrorX>>,
536) -> (
537    StreamVec<'scope, T, Result<ProtoBatch, StorageErrorX>>,
538    PressOnDropButton,
539)
540where
541    T: timely::progress::Timestamp,
542{
543    let persist_location = collection_meta.persist_location.clone();
544    let shard_id = collection_meta.data_shard;
545    let collection_desc = Arc::new(collection_meta.relation_desc.clone());
546
547    let mut builder =
548        AsyncOperatorBuilder::new("CopyFrom-stage_batches".to_string(), scope.clone());
549
550    let (proto_batch_handle, proto_batch_stream) =
551        builder.new_output::<CapacityContainerBuilder<_>>();
552    let mut rows_handle = builder.new_input_for(rows_stream, Pipeline, &proto_batch_handle);
553
554    let shutdown = builder.build(move |caps| async move {
555        let [proto_batch_cap] = caps.try_into().unwrap();
556
557        // Open a Persist handle that we can use to stage a batch.
558        let persist_client = persist_clients
559            .open(persist_location)
560            .await
561            .expect("failed to open Persist client");
562        let persist_diagnostics = Diagnostics {
563            shard_name: collection_id.to_string(),
564            handle_purpose: "CopyFrom::stage_batches".to_string(),
565        };
566        let write_handle = persist_client
567            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
568                shard_id,
569                Arc::clone(&collection_desc),
570                Arc::new(UnitSchema),
571                persist_diagnostics,
572            )
573            .await
574            .expect("could not open Persist shard");
575
576        // Create a batch using the minimum timestamp since these batches will
577        // get sent back to `environmentd` and their timestamps re-written
578        // before being finally appended.
579        let lower = mz_repr::Timestamp::MIN;
580        let upper = Antichain::from_elem(lower.step_forward());
581
582        let mut batch_builder = write_handle.builder(Antichain::from_elem(lower));
583
584        while let Some(event) = rows_handle.next().await {
585            let AsyncEvent::Data(_, row_batch) = event else {
586                continue;
587            };
588
589            // Pull Rows off our stream and stage them into a Batch.
590            for maybe_row in row_batch {
591                let maybe_row = maybe_row.and_then(|row| {
592                    Row::validate(&row, &*collection_desc).map_err(|e| {
593                        StorageErrorXKind::invalid_record_batch(e).with_context("stage_batches")
594                    })?;
595                    Ok(row)
596                });
597                match maybe_row {
598                    // Happy path, add the Row to our batch!
599                    Ok(row) => {
600                        let data = SourceData(Ok(row));
601                        batch_builder
602                            .add(&data, &(), &lower, &1)
603                            .await
604                            .expect("failed to add Row to batch");
605                    }
606                    // Sad path, something upstream hit an error.
607                    Err(err) => {
608                        // Clean up our in-progress batch so we don't leak data.
609                        let batch = batch_builder
610                            .finish(upper)
611                            .await
612                            .expect("failed to cleanup batch");
613                        batch.delete().await;
614
615                        // Pass on the error.
616                        proto_batch_handle
617                            .give(&proto_batch_cap, Err(err).context("stage batches"));
618                        return;
619                    }
620                }
621            }
622        }
623
624        let batch = batch_builder
625            .finish(upper)
626            .await
627            .expect("failed to create Batch");
628
629        // Turn our Batch into a ProtoBatch that will later be linked in to
630        // the shard.
631        //
632        // Note: By turning this into a ProtoBatch, the onus is now on us to
633        // cleanup the Batch if it's never linked into the shard.
634        //
635        // TODO(cf2): Make sure these batches get cleaned up if another
636        // worker encounters an error.
637        let proto_batch = batch.into_transmittable_batch();
638        proto_batch_handle.give(&proto_batch_cap, Ok(proto_batch));
639    });
640
641    (proto_batch_stream, shutdown.press_on_drop())
642}
643
644/// Render an operator that given a stream of [`ProtoBatch`]es will call our `worker_callback` to
645/// report the results upstream.
646pub fn render_completion_operator<'scope, T, F>(
647    scope: Scope<'scope, T>,
648    results_stream: StreamVec<'scope, T, Result<ProtoBatch, StorageErrorX>>,
649    worker_callback: F,
650) where
651    T: timely::progress::Timestamp,
652    F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
653{
654    let mut builder = AsyncOperatorBuilder::new("CopyFrom-completion".to_string(), scope.clone());
655    let mut results_input = builder.new_disconnected_input(results_stream, Pipeline);
656
657    builder.build(move |_| async move {
658        let result = async move {
659            let mut maybe_payload: Option<ProtoBatch> = None;
660
661            while let Some(event) = results_input.next().await {
662                if let AsyncEvent::Data(_cap, results) = event {
663                    let [result] = results
664                        .try_into()
665                        .expect("only 1 event on the result stream");
666
667                    // TODO(cf2): Lift this restriction.
668                    if maybe_payload.is_some() {
669                        panic!("expected only one batch!");
670                    }
671
672                    maybe_payload = Some(result.map_err(|e| e.to_string())?);
673                }
674            }
675
676            Ok(maybe_payload)
677        }
678        .await;
679
680        // Report to the caller of our final status.
681        worker_callback(result);
682    });
683}
684
685/// An object that will be fetched from a [`OneshotSource`].
686pub trait OneshotObject {
687    /// Name of the object, including any extensions.
688    fn name(&self) -> &str;
689
690    /// Path of the object within the remote source.
691    fn path(&self) -> &str;
692
693    /// Size of this object in bytes.
694    fn size(&self) -> usize;
695
696    /// Encodings of the _entire_ object, if any.
697    ///
698    /// Note: The object may internally use compression, e.g. a Parquet file
699    /// could compress its column chunks, but if the Parquet file itself is not
700    /// compressed then this would return `None`.
701    fn encodings(&self) -> &[Encoding];
702}
703
704/// Encoding of a [`OneshotObject`].
705#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
706pub enum Encoding {
707    Bzip2,
708    Gzip,
709    Xz,
710    Zstd,
711}
712
713/// Defines a remote system that we can fetch data from for a "one time" ingestion.
714pub trait OneshotSource: Clone + Send + Unpin {
715    /// An individual unit within the source, e.g. a file.
716    type Object: OneshotObject
717        + Debug
718        + Clone
719        + Send
720        + Unpin
721        + Serialize
722        + DeserializeOwned
723        + 'static;
724    /// Checksum for a [`Self::Object`].
725    type Checksum: Debug + Clone + Send + Unpin + Serialize + DeserializeOwned + 'static;
726
727    /// Returns all of the objects for this source.
728    fn list<'a>(
729        &'a self,
730    ) -> impl Future<Output = Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX>> + Send;
731
732    /// Resturns a stream of the data for a specific object.
733    fn get<'s>(
734        &'s self,
735        object: Self::Object,
736        checksum: Self::Checksum,
737        range: Option<std::ops::RangeInclusive<usize>>,
738    ) -> BoxStream<'s, Result<Bytes, StorageErrorX>>;
739}
740
741/// An enum wrapper around [`OneshotSource`]s.
742///
743/// An alternative to this wrapper would be to use `Box<dyn OneshotSource>`, but that requires
744/// making the trait object safe and it's easier to just wrap it in an enum. Also, this wrapper
745/// provides a convenient place to add [`StorageErrorXContext::context`] for all of our source
746/// types.
747#[derive(Clone, Debug)]
748pub(crate) enum SourceKind {
749    Http(HttpOneshotSource),
750    AwsS3(AwsS3Source),
751}
752
753impl OneshotSource for SourceKind {
754    type Object = ObjectKind;
755    type Checksum = ChecksumKind;
756
757    async fn list<'a>(&'a self) -> Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX> {
758        match self {
759            SourceKind::Http(http) => {
760                let objects = http.list().await.context("http")?;
761                let objects = objects
762                    .into_iter()
763                    .map(|(object, checksum)| {
764                        (ObjectKind::Http(object), ChecksumKind::Http(checksum))
765                    })
766                    .collect();
767                Ok(objects)
768            }
769            SourceKind::AwsS3(s3) => {
770                let objects = s3.list().await.context("s3")?;
771                let objects = objects
772                    .into_iter()
773                    .map(|(object, checksum)| {
774                        (ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum))
775                    })
776                    .collect();
777                Ok(objects)
778            }
779        }
780    }
781
782    fn get<'s>(
783        &'s self,
784        object: Self::Object,
785        checksum: Self::Checksum,
786        range: Option<std::ops::RangeInclusive<usize>>,
787    ) -> BoxStream<'s, Result<Bytes, StorageErrorX>> {
788        match (self, object, checksum) {
789            (SourceKind::Http(http), ObjectKind::Http(object), ChecksumKind::Http(checksum)) => {
790                http.get(object, checksum, range)
791                    .map(|result| result.context("http"))
792                    .boxed()
793            }
794            (SourceKind::AwsS3(s3), ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum)) => s3
795                .get(object, checksum, range)
796                .map(|result| result.context("aws_s3"))
797                .boxed(),
798            (SourceKind::AwsS3(_) | SourceKind::Http(_), _, _) => {
799                unreachable!("programming error! wrong source, object, and checksum kind");
800            }
801        }
802    }
803}
804
805/// Enum wrapper for [`OneshotSource::Object`], see [`SourceKind`] for more details.
806#[derive(Debug, Clone, Serialize, Deserialize)]
807pub(crate) enum ObjectKind {
808    Http(HttpObject),
809    AwsS3(S3Object),
810}
811
812impl OneshotObject for ObjectKind {
813    fn name(&self) -> &str {
814        match self {
815            ObjectKind::Http(object) => object.name(),
816            ObjectKind::AwsS3(object) => object.name(),
817        }
818    }
819
820    fn path(&self) -> &str {
821        match self {
822            ObjectKind::Http(object) => object.path(),
823            ObjectKind::AwsS3(object) => object.path(),
824        }
825    }
826
827    fn size(&self) -> usize {
828        match self {
829            ObjectKind::Http(object) => object.size(),
830            ObjectKind::AwsS3(object) => object.size(),
831        }
832    }
833
834    fn encodings(&self) -> &[Encoding] {
835        match self {
836            ObjectKind::Http(object) => object.encodings(),
837            ObjectKind::AwsS3(object) => object.encodings(),
838        }
839    }
840}
841
842/// Enum wrapper for [`OneshotSource::Checksum`], see [`SourceKind`] for more details.
843#[derive(Debug, Clone, Serialize, Deserialize)]
844pub(crate) enum ChecksumKind {
845    Http(HttpChecksum),
846    AwsS3(S3Checksum),
847}
848
849/// Defines a format that we fetch for a "one time" ingestion.
850pub trait OneshotFormat: Clone {
851    /// A single unit of work for decoding this format, e.g. a single Parquet RowGroup.
852    type WorkRequest<S>: Debug + Clone + Send + Serialize + DeserializeOwned + 'static
853    where
854        S: OneshotSource;
855    /// A chunk of records in this format that can be decoded into Rows.
856    type RecordChunk: Debug + Clone + Send + Serialize + DeserializeOwned + 'static;
857
858    /// Given an upstream object, defines how we should parse this object in parallel.
859    ///
860    /// Note: It's totally fine to not process an object in parallel, and just return a single
861    /// [`Self::WorkRequest`] here.
862    fn split_work<S: OneshotSource + Send>(
863        &self,
864        source: S,
865        object: S::Object,
866        checksum: S::Checksum,
867    ) -> impl Future<Output = Result<Vec<Self::WorkRequest<S>>, StorageErrorX>> + Send;
868
869    /// Given a work request, fetch data from the [`OneshotSource`] and return it in a format that
870    /// can later be decoded.
871    fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
872        &'a self,
873        source: &'a S,
874        request: Self::WorkRequest<S>,
875    ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>>;
876
877    /// Decode a chunk of records into [`Row`]s.
878    fn decode_chunk(
879        &self,
880        chunk: Self::RecordChunk,
881        rows: &mut Vec<Row>,
882    ) -> Result<usize, StorageErrorX>;
883}
884
885/// An enum wrapper around [`OneshotFormat`]s.
886///
887/// An alternative to this wrapper would be to use `Box<dyn OneshotFormat>`, but that requires
888/// making the trait object safe and it's easier to just wrap it in an enum. Also, this wrapper
889/// provides a convenient place to add [`StorageErrorXContext::context`] for all of our format
890/// types.
891#[derive(Clone, Debug)]
892pub(crate) enum FormatKind {
893    Csv(CsvDecoder),
894    Parquet(ParquetFormat),
895}
896
897impl OneshotFormat for FormatKind {
898    type WorkRequest<S>
899        = RequestKind<S::Object, S::Checksum>
900    where
901        S: OneshotSource;
902    type RecordChunk = RecordChunkKind;
903
904    async fn split_work<S: OneshotSource + Send>(
905        &self,
906        source: S,
907        object: S::Object,
908        checksum: S::Checksum,
909    ) -> Result<Vec<Self::WorkRequest<S>>, StorageErrorX> {
910        match self {
911            FormatKind::Csv(csv) => {
912                let work = csv
913                    .split_work(source, object, checksum)
914                    .await
915                    .context("csv")?
916                    .into_iter()
917                    .map(RequestKind::Csv)
918                    .collect();
919                Ok(work)
920            }
921            FormatKind::Parquet(parquet) => {
922                let work = parquet
923                    .split_work(source, object, checksum)
924                    .await
925                    .context("parquet")?
926                    .into_iter()
927                    .map(RequestKind::Parquet)
928                    .collect();
929                Ok(work)
930            }
931        }
932    }
933
934    fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
935        &'a self,
936        source: &'a S,
937        request: Self::WorkRequest<S>,
938    ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>> {
939        match (self, request) {
940            (FormatKind::Csv(csv), RequestKind::Csv(request)) => csv
941                .fetch_work(source, request)
942                .map_ok(RecordChunkKind::Csv)
943                .map(|result| result.context("csv"))
944                .boxed(),
945            (FormatKind::Parquet(parquet), RequestKind::Parquet(request)) => parquet
946                .fetch_work(source, request)
947                .map_ok(RecordChunkKind::Parquet)
948                .map(|result| result.context("parquet"))
949                .boxed(),
950            (FormatKind::Parquet(_), RequestKind::Csv(_))
951            | (FormatKind::Csv(_), RequestKind::Parquet(_)) => {
952                unreachable!("programming error, {self:?}")
953            }
954        }
955    }
956
957    fn decode_chunk(
958        &self,
959        chunk: Self::RecordChunk,
960        rows: &mut Vec<Row>,
961    ) -> Result<usize, StorageErrorX> {
962        match (self, chunk) {
963            (FormatKind::Csv(csv), RecordChunkKind::Csv(chunk)) => {
964                csv.decode_chunk(chunk, rows).context("csv")
965            }
966            (FormatKind::Parquet(parquet), RecordChunkKind::Parquet(chunk)) => {
967                parquet.decode_chunk(chunk, rows).context("parquet")
968            }
969            (FormatKind::Parquet(_), RecordChunkKind::Csv(_))
970            | (FormatKind::Csv(_), RecordChunkKind::Parquet(_)) => {
971                unreachable!("programming error, {self:?}")
972            }
973        }
974    }
975}
976
977#[derive(Clone, Debug, Serialize, Deserialize)]
978pub(crate) enum RequestKind<O, C> {
979    Csv(CsvWorkRequest<O, C>),
980    Parquet(ParquetWorkRequest<O, C>),
981}
982
983#[derive(Clone, Debug, Serialize, Deserialize)]
984pub(crate) enum RecordChunkKind {
985    Csv(CsvRecord),
986    Parquet(ParquetRowGroup),
987}
988
989pub(crate) enum ObjectFilter {
990    None,
991    Files(BTreeSet<Box<str>>),
992    Pattern(glob::Pattern),
993}
994
995impl ObjectFilter {
996    pub fn try_new(filter: ContentFilter) -> Result<Self, anyhow::Error> {
997        match filter {
998            ContentFilter::None => Ok(ObjectFilter::None),
999            ContentFilter::Files(files) => {
1000                let files = files.into_iter().map(|f| f.into()).collect();
1001                Ok(ObjectFilter::Files(files))
1002            }
1003            ContentFilter::Pattern(pattern) => {
1004                let pattern = glob::Pattern::new(&pattern)?;
1005                Ok(ObjectFilter::Pattern(pattern))
1006            }
1007        }
1008    }
1009
1010    /// Returns if the object should be included.
1011    pub fn filter<S: OneshotSource>(&self, object: &S::Object) -> bool {
1012        match self {
1013            ObjectFilter::None => true,
1014            ObjectFilter::Files(files) => files.contains(object.path()),
1015            ObjectFilter::Pattern(pattern) => pattern.matches(object.path()),
1016        }
1017    }
1018}
1019
1020/// Experimental Error Type.
1021///
1022/// The goal of this type is to combine concepts from both `thiserror` and
1023/// `anyhow`. Having "stongly typed" errors from `thiserror` is useful for
1024/// determining what action to take and tracking the context of an error like
1025/// `anyhow` is useful for determining where an error came from.
1026#[derive(Debug, Clone, Deserialize, Serialize)]
1027pub struct StorageErrorX {
1028    kind: StorageErrorXKind,
1029    context: LinkedList<String>,
1030}
1031
1032impl fmt::Display for StorageErrorX {
1033    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1034        writeln!(f, "error: {}", self.kind)?;
1035        writeln!(f, "causes: {:?}", self.context)?;
1036        Ok(())
1037    }
1038}
1039
1040/// Experimental Error Type, see [`StorageErrorX`].
1041#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
1042pub enum StorageErrorXKind {
1043    #[error("csv decoding error: {0}")]
1044    CsvDecoding(Arc<str>),
1045    #[error("parquet error: {0}")]
1046    ParquetError(Arc<str>),
1047    #[error("reqwest error: {0}")]
1048    Reqwest(Arc<str>),
1049    #[error("aws s3 request error: {0}")]
1050    AwsS3Request(String),
1051    #[error("aws s3 bytestream error: {0}")]
1052    AwsS3Bytes(Arc<str>),
1053    #[error("invalid reqwest header: {0}")]
1054    InvalidHeader(Arc<str>),
1055    #[error("failed to decode Row from a record batch: {0}")]
1056    InvalidRecordBatch(Arc<str>),
1057    #[error("programming error: {0}")]
1058    ProgrammingError(Arc<str>),
1059    #[error("failed to get the size of an object")]
1060    MissingSize,
1061    #[error("object is missing the required '{0}' field")]
1062    MissingField(Arc<str>),
1063    #[error("failed while evaluating the provided mfp: '{0}'")]
1064    MfpEvalError(Arc<str>),
1065    #[error("no matching files found at the given url")]
1066    NoMatchingFiles,
1067    #[error("something went wrong: {0}")]
1068    Generic(String),
1069}
1070
1071impl From<csv_async::Error> for StorageErrorXKind {
1072    fn from(err: csv_async::Error) -> Self {
1073        StorageErrorXKind::CsvDecoding(err.to_string().into())
1074    }
1075}
1076
1077impl From<reqwest::Error> for StorageErrorXKind {
1078    fn from(err: reqwest::Error) -> Self {
1079        StorageErrorXKind::Reqwest(err.to_string().into())
1080    }
1081}
1082
1083impl From<reqwest::header::ToStrError> for StorageErrorXKind {
1084    fn from(err: reqwest::header::ToStrError) -> Self {
1085        StorageErrorXKind::InvalidHeader(err.to_string().into())
1086    }
1087}
1088
1089impl From<aws_smithy_types::byte_stream::error::Error> for StorageErrorXKind {
1090    fn from(err: aws_smithy_types::byte_stream::error::Error) -> Self {
1091        StorageErrorXKind::AwsS3Bytes(DisplayErrorContext(err).to_string().into())
1092    }
1093}
1094
1095impl From<::parquet::errors::ParquetError> for StorageErrorXKind {
1096    fn from(err: ::parquet::errors::ParquetError) -> Self {
1097        StorageErrorXKind::ParquetError(err.to_string().into())
1098    }
1099}
1100
1101impl StorageErrorXKind {
1102    pub fn with_context<C: Display>(self, context: C) -> StorageErrorX {
1103        StorageErrorX {
1104            kind: self,
1105            context: LinkedList::from([context.to_string()]),
1106        }
1107    }
1108
1109    pub fn invalid_record_batch<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1110        StorageErrorXKind::InvalidRecordBatch(error.into())
1111    }
1112
1113    pub fn generic<C: Display>(error: C) -> StorageErrorXKind {
1114        StorageErrorXKind::Generic(error.to_string())
1115    }
1116
1117    pub fn programming_error<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1118        StorageErrorXKind::ProgrammingError(error.into())
1119    }
1120}
1121
1122impl<E> From<E> for StorageErrorX
1123where
1124    E: Into<StorageErrorXKind>,
1125{
1126    fn from(err: E) -> Self {
1127        StorageErrorX {
1128            kind: err.into(),
1129            context: LinkedList::new(),
1130        }
1131    }
1132}
1133
1134trait StorageErrorXContext<T> {
1135    fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1136    where
1137        C: Display;
1138}
1139
1140impl<T, E> StorageErrorXContext<T> for Result<T, E>
1141where
1142    E: Into<StorageErrorXKind>,
1143{
1144    fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1145    where
1146        C: Display,
1147    {
1148        match self {
1149            Ok(val) => Ok(val),
1150            Err(kind) => Err(StorageErrorX {
1151                kind: kind.into(),
1152                context: LinkedList::from([context.to_string()]),
1153            }),
1154        }
1155    }
1156}
1157
1158impl<T> StorageErrorXContext<T> for Result<T, StorageErrorX> {
1159    fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1160    where
1161        C: Display,
1162    {
1163        match self {
1164            Ok(val) => Ok(val),
1165            Err(mut e) => {
1166                e.context.push_back(context.to_string());
1167                Err(e)
1168            }
1169        }
1170    }
1171}