mz_storage_operators/
oneshot_source.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! "Oneshot" sources are a one-time ingestion of data from an external system, unlike traditional
11//! sources, they __do not__ run continuously. Oneshot sources are generally used for `COPY FROM`
12//! SQL statements.
13//!
14//! The implementation of reading and parsing data is behind the [`OneshotSource`] and
15//! [`OneshotFormat`] traits, respectively. Users looking to add new sources or formats, should
16//! only need to add new implementations for these traits.
17//!
18//! * [`OneshotSource`] is an interface for listing and reading from an external system, e.g. an
19//!   HTTP server.
20//! * [`OneshotFormat`] is an interface for how to parallelize and parse data, e.g. CSV.
21//!
22//! Given a [`OneshotSource`] and a [`OneshotFormat`] we build a dataflow structured like the
23//! following:
24//!
25//! ```text
26//!             ┏━━━━━━━━━━━━━━━┓
27//!             ┃    Discover   ┃
28//!             ┃    objects    ┃
29//!             ┗━━━━━━━┯━━━━━━━┛
30//!           ┌───< Distribute >───┐
31//!           │                    │
32//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
33//!     ┃  Split   ┃   ...   ┃  Split   ┃
34//!     ┃  Work 1  ┃         ┃  Work n  ┃
35//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
36//!           │                    │
37//!           ├───< Distribute >───┤
38//!           │                    │
39//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
40//!     ┃  Fetch   ┃   ...   ┃  Fetch   ┃
41//!     ┃  Work 1  ┃         ┃  Work n  ┃
42//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
43//!           │                    │
44//!           ├───< Distribute >───┤
45//!           │                    │
46//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
47//!     ┃  Decode  ┃   ...   ┃  Decode  ┃
48//!     ┃  Chunk 1 ┃         ┃  Chunk n ┃
49//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
50//!           │                    │
51//!           │                    │
52//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
53//!     ┃  Stage   ┃   ...   ┃  Stage   ┃
54//!     ┃  Batch 1 ┃         ┃  Batch n ┃
55//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
56//!           │                    │
57//!           └─────────┬──────────┘
58//!               ┏━━━━━v━━━━┓
59//!               ┃  Result  ┃
60//!               ┃ Callback ┃
61//!               ┗━━━━━━━━━━┛
62//! ```
63//!
64
65use std::fmt;
66use std::sync::Arc;
67
68use bytes::Bytes;
69use differential_dataflow::Hashable;
70use futures::stream::BoxStream;
71use futures::{StreamExt, TryStreamExt};
72use mz_expr::SafeMfpPlan;
73use mz_ore::cast::CastFrom;
74use mz_persist_client::Diagnostics;
75use mz_persist_client::batch::ProtoBatch;
76use mz_persist_client::cache::PersistClientCache;
77use mz_persist_types::codec_impls::UnitSchema;
78use mz_repr::{DatumVec, GlobalId, Row, RowArena, Timestamp};
79use mz_storage_types::StorageDiff;
80use mz_storage_types::connections::ConnectionContext;
81use mz_storage_types::controller::CollectionMetadata;
82use mz_storage_types::oneshot_sources::{
83    ContentFilter, ContentFormat, ContentSource, OneshotIngestionRequest,
84};
85use mz_storage_types::sources::SourceData;
86use mz_timely_util::builder_async::{
87    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
88};
89use mz_timely_util::pact::Distribute;
90use serde::de::DeserializeOwned;
91use serde::{Deserialize, Serialize};
92use std::collections::{BTreeSet, LinkedList};
93use std::fmt::{Debug, Display};
94use std::future::Future;
95use timely::container::CapacityContainerBuilder;
96use timely::dataflow::channels::pact::Pipeline;
97use timely::dataflow::{Scope, Stream as TimelyStream};
98use timely::progress::Antichain;
99use tracing::info;
100
101use crate::oneshot_source::aws_source::{AwsS3Source, S3Checksum, S3Object};
102use crate::oneshot_source::csv::{CsvDecoder, CsvRecord, CsvWorkRequest};
103use crate::oneshot_source::http_source::{HttpChecksum, HttpObject, HttpOneshotSource};
104use crate::oneshot_source::parquet::{ParquetFormat, ParquetRowGroup, ParquetWorkRequest};
105
106pub mod csv;
107pub mod parquet;
108
109pub mod aws_source;
110pub mod http_source;
111
112mod util;
113
114/// Render a dataflow to do a "oneshot" ingestion.
115///
116/// Roughly the operators we render do the following:
117///
118/// 1. Discover objects with a [`OneshotSource`].
119/// 2. Split objects into separate units of work based on the [`OneshotFormat`].
120/// 3. Fetch individual units of work (aka fetch byte blobs) with the
121///    [`OneshotFormat`] and [`OneshotSource`].
122/// 4. Decode the fetched byte blobs into [`Row`]s.
123/// 5. Stage the [`Row`]s into Persist returning [`ProtoBatch`]es.
124///
125/// TODO(cf3): Benchmark combining operators 3, 4, and 5. Currently we keep them
126/// separate for the [`CsvDecoder`]. CSV decoding is hard to do in parallel so we
127/// currently have a single worker Fetch an entire file, and then distributes
128/// chunks for parallel Decoding. We should benchmark if this is actually faster
129/// than just a single worker both fetching and decoding.
130///
131pub fn render<G, F>(
132    scope: G,
133    persist_clients: Arc<PersistClientCache>,
134    connection_context: ConnectionContext,
135    collection_id: GlobalId,
136    collection_meta: CollectionMetadata,
137    request: OneshotIngestionRequest,
138    worker_callback: F,
139) -> Vec<PressOnDropButton>
140where
141    G: Scope<Timestamp = Timestamp>,
142    F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
143{
144    let OneshotIngestionRequest {
145        source,
146        format,
147        filter,
148        shape,
149    } = request;
150
151    let source = match source {
152        ContentSource::Http { url } => {
153            let source = HttpOneshotSource::new(reqwest::Client::default(), url);
154            SourceKind::Http(source)
155        }
156        ContentSource::AwsS3 {
157            connection,
158            connection_id,
159            uri,
160        } => {
161            let source = AwsS3Source::new(connection, connection_id, connection_context, uri);
162            SourceKind::AwsS3(source)
163        }
164    };
165    tracing::info!(?source, "created oneshot source");
166
167    let format = match format {
168        ContentFormat::Csv(params) => {
169            let format = CsvDecoder::new(params, &shape.source_desc);
170            FormatKind::Csv(format)
171        }
172        ContentFormat::Parquet => {
173            let format = ParquetFormat::new(shape.source_desc);
174            FormatKind::Parquet(format)
175        }
176    };
177
178    // Discover what objects are available to copy.
179    let (objects_stream, discover_token) =
180        render_discover_objects(scope.clone(), collection_id, source.clone(), filter);
181    // Split the objects into individual units of work.
182    let (work_stream, split_token) = render_split_work(
183        scope.clone(),
184        collection_id,
185        &objects_stream,
186        source.clone(),
187        format.clone(),
188    );
189    // Fetch each unit of work, returning chunks of records.
190    let (records_stream, fetch_token) = render_fetch_work(
191        scope.clone(),
192        collection_id,
193        source.clone(),
194        format.clone(),
195        &work_stream,
196    );
197    // Parse chunks of records into Rows.
198    let (rows_stream, decode_token) = render_decode_chunk(
199        scope.clone(),
200        format.clone(),
201        &records_stream,
202        shape.source_mfp,
203    );
204    // Stage the Rows in Persist.
205    let (batch_stream, batch_token) = render_stage_batches_operator(
206        scope.clone(),
207        collection_id,
208        &collection_meta,
209        persist_clients,
210        &rows_stream,
211    );
212
213    // Collect all results together and notify the upstream of whether or not we succeeded.
214    render_completion_operator(scope, &batch_stream, worker_callback);
215
216    let tokens = vec![
217        discover_token,
218        split_token,
219        fetch_token,
220        decode_token,
221        batch_token,
222    ];
223
224    tokens
225}
226
227/// Render an operator that using a [`OneshotSource`] will discover what objects are available
228/// for fetching.
229pub fn render_discover_objects<G, S>(
230    scope: G,
231    collection_id: GlobalId,
232    source: S,
233    filter: ContentFilter,
234) -> (
235    TimelyStream<G, Result<(S::Object, S::Checksum), StorageErrorX>>,
236    PressOnDropButton,
237)
238where
239    G: Scope<Timestamp = Timestamp>,
240    S: OneshotSource + 'static,
241{
242    // Only a single worker is responsible for discovering objects.
243    let worker_id = scope.index();
244    let num_workers = scope.peers();
245    let active_worker_id = usize::cast_from((collection_id, "discover").hashed()) % num_workers;
246    let is_active_worker = worker_id == active_worker_id;
247
248    let mut builder = AsyncOperatorBuilder::new("CopyFrom-discover".to_string(), scope.clone());
249
250    let (start_handle, start_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
251
252    let shutdown = builder.build(move |caps| async move {
253        let [start_cap] = caps.try_into().unwrap();
254
255        if !is_active_worker {
256            return;
257        }
258
259        let filter = match ObjectFilter::try_new(filter) {
260            Ok(filter) => filter,
261            Err(err) => {
262                tracing::warn!(?err, "failed to create filter");
263                start_handle.give(&start_cap, Err(StorageErrorXKind::generic(err).into()));
264                return;
265            }
266        };
267
268        let work = source.list().await.context("list");
269        match work {
270            Ok(objects) => {
271                let (include, exclude): (Vec<_>, Vec<_>) = objects
272                    .into_iter()
273                    .partition(|(o, _checksum)| filter.filter::<S>(o));
274                tracing::info!(%worker_id, ?include, ?exclude, "listed objects");
275
276                include
277                    .into_iter()
278                    .for_each(|object| start_handle.give(&start_cap, Ok(object)))
279            }
280            Err(err) => {
281                tracing::warn!(?err, "failed to list oneshot source");
282                start_handle.give(&start_cap, Err(err))
283            }
284        }
285    });
286
287    (start_stream, shutdown.press_on_drop())
288}
289
290/// Render an operator that given a stream of [`OneshotSource::Object`]s will split them into units
291/// of work based on the provided [`OneshotFormat`].
292pub fn render_split_work<G, S, F>(
293    scope: G,
294    collection_id: GlobalId,
295    objects: &TimelyStream<G, Result<(S::Object, S::Checksum), StorageErrorX>>,
296    source: S,
297    format: F,
298) -> (
299    TimelyStream<G, Result<F::WorkRequest<S>, StorageErrorX>>,
300    PressOnDropButton,
301)
302where
303    G: Scope,
304    S: OneshotSource + Send + Sync + 'static,
305    F: OneshotFormat + Send + Sync + 'static,
306{
307    let worker_id = scope.index();
308    let mut builder = AsyncOperatorBuilder::new("CopyFrom-split_work".to_string(), scope.clone());
309
310    let (request_handle, request_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
311    let mut objects_handle = builder.new_input_for(objects, Distribute, &request_handle);
312
313    let shutdown = builder.build(move |caps| async move {
314        let [_objects_cap] = caps.try_into().unwrap();
315
316        info!(%collection_id, %worker_id, "CopyFrom Split Work");
317
318        while let Some(event) = objects_handle.next().await {
319            let (capability, maybe_objects) = match event {
320                AsyncEvent::Data(cap, req) => (cap, req),
321                AsyncEvent::Progress(_) => continue,
322            };
323
324            // Nest the `split_work(...)` method in an async-block so we can use the `?`
325            // without returning from the entire operator, and so we can add context.
326            let result = async {
327                let mut requests = Vec::new();
328
329                for maybe_object in maybe_objects {
330                    // Return early if the upstream Discover step failed.
331                    let (object, checksum) = maybe_object?;
332
333                    let format_ = format.clone();
334                    let source_ = source.clone();
335                    let work_requests = mz_ore::task::spawn(|| "split-work", async move {
336                        info!(%worker_id, object = %object.name(), "splitting object");
337                        format_.split_work(source_.clone(), object, checksum).await
338                    })
339                    .await?;
340
341                    requests.extend(work_requests);
342                }
343
344                Ok::<_, StorageErrorX>(requests)
345            }
346            .await
347            .context("split");
348
349            match result {
350                Ok(requests) => requests
351                    .into_iter()
352                    .for_each(|req| request_handle.give(&capability, Ok(req))),
353                Err(err) => request_handle.give(&capability, Err(err)),
354            }
355        }
356    });
357
358    (request_stream, shutdown.press_on_drop())
359}
360
361/// Render an operator that given a stream [`OneshotFormat::WorkRequest`]s will fetch chunks of the
362/// remote [`OneshotSource::Object`] and return a stream of [`OneshotFormat::RecordChunk`]s that
363/// can be decoded into [`Row`]s.
364pub fn render_fetch_work<G, S, F>(
365    scope: G,
366    collection_id: GlobalId,
367    source: S,
368    format: F,
369    work_requests: &TimelyStream<G, Result<F::WorkRequest<S>, StorageErrorX>>,
370) -> (
371    TimelyStream<G, Result<F::RecordChunk, StorageErrorX>>,
372    PressOnDropButton,
373)
374where
375    G: Scope,
376    S: OneshotSource + Sync + 'static,
377    F: OneshotFormat + Sync + 'static,
378{
379    let worker_id = scope.index();
380    let mut builder = AsyncOperatorBuilder::new("CopyFrom-fetch_work".to_string(), scope.clone());
381
382    let (record_handle, record_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
383    let mut work_requests_handle = builder.new_input_for(work_requests, Distribute, &record_handle);
384
385    let shutdown = builder.build(move |caps| async move {
386        let [_work_cap] = caps.try_into().unwrap();
387
388        info!(%collection_id, %worker_id, "CopyFrom Fetch Work");
389
390        while let Some(event) = work_requests_handle.next().await {
391            let (capability, maybe_requests) = match event {
392                AsyncEvent::Data(cap, req) => (cap, req),
393                AsyncEvent::Progress(_) => continue,
394            };
395
396            // Wrap our work in a block to capture `?`.
397            let result = async {
398                // Process each stream of work, one at a time.
399                for maybe_request in maybe_requests {
400                    let request = maybe_request?;
401
402                    let mut work_stream = format.fetch_work(&source, request);
403                    while let Some(result) = work_stream.next().await {
404                        // Returns early and stop consuming from the stream if we hit an error.
405                        let record_chunk = result.context("fetch worker")?;
406
407                        // Note: We want to give record chunks as we receive them, because a work
408                        // request may be for an entire file.
409                        //
410                        // TODO(cf3): Investigate if some small batching here would help perf.
411                        record_handle.give(&capability, Ok(record_chunk));
412                    }
413                }
414
415                Ok::<_, StorageErrorX>(())
416            }
417            .await
418            .context("fetch work");
419
420            if let Err(err) = result {
421                tracing::warn!(?err, "failed to fetch");
422                record_handle.give(&capability, Err(err))
423            }
424        }
425    });
426
427    (record_stream, shutdown.press_on_drop())
428}
429
430/// Render an operator that given a stream of [`OneshotFormat::RecordChunk`]s will decode these
431/// chunks into a stream of [`Row`]s.
432pub fn render_decode_chunk<G, F>(
433    scope: G,
434    format: F,
435    record_chunks: &TimelyStream<G, Result<F::RecordChunk, StorageErrorX>>,
436    mfp: SafeMfpPlan,
437) -> (
438    TimelyStream<G, Result<Row, StorageErrorX>>,
439    PressOnDropButton,
440)
441where
442    G: Scope,
443    F: OneshotFormat + 'static,
444{
445    let mut builder = AsyncOperatorBuilder::new("CopyFrom-decode_chunk".to_string(), scope.clone());
446
447    let (row_handle, row_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
448    let mut record_chunk_handle = builder.new_input_for(record_chunks, Distribute, &row_handle);
449
450    let shutdown = builder.build(move |caps| async move {
451        let [_row_cap] = caps.try_into().unwrap();
452
453        let mut datum_vec = DatumVec::default();
454        let row_arena = RowArena::default();
455        let mut row_buf = Row::default();
456
457        while let Some(event) = record_chunk_handle.next().await {
458            let (capability, maybe_chunks) = match event {
459                AsyncEvent::Data(cap, data) => (cap, data),
460                AsyncEvent::Progress(_) => continue,
461            };
462
463            let result = async {
464                let mut rows = Vec::new();
465                for maybe_chunk in maybe_chunks {
466                    let chunk = maybe_chunk?;
467                    format.decode_chunk(chunk, &mut rows)?;
468                }
469                Ok::<_, StorageErrorX>(rows)
470            }
471            .await
472            .context("decode chunk");
473
474            match result {
475                Ok(rows) => {
476                    // For each row of source data, we pass it through an MFP to re-arrange column
477                    // orders and/or fill in default values for missing columns.
478                    for row in rows {
479                        let mut datums = datum_vec.borrow_with(&row);
480                        let result = mfp
481                            .evaluate_into(&mut *datums, &row_arena, &mut row_buf)
482                            .map(|row| row.cloned());
483
484                        match result {
485                            Ok(Some(row)) => row_handle.give(&capability, Ok(row)),
486                            Ok(None) => {
487                                // We only expect the provided MFP to map rows from the source data
488                                // and project default values.
489                                mz_ore::soft_panic_or_log!("oneshot source MFP filtered out data!");
490                            }
491                            Err(e) => {
492                                let err = StorageErrorXKind::MfpEvalError(e.to_string().into())
493                                    .with_context("decode");
494                                row_handle.give(&capability, Err(err))
495                            }
496                        }
497                    }
498                }
499                Err(err) => row_handle.give(&capability, Err(err)),
500            }
501        }
502    });
503
504    (row_stream, shutdown.press_on_drop())
505}
506
507/// Render an operator that given a stream of [`Row`]s will stage them in Persist and return a
508/// stream of [`ProtoBatch`]es that can later be linked into a shard.
509pub fn render_stage_batches_operator<G>(
510    scope: G,
511    collection_id: GlobalId,
512    collection_meta: &CollectionMetadata,
513    persist_clients: Arc<PersistClientCache>,
514    rows_stream: &TimelyStream<G, Result<Row, StorageErrorX>>,
515) -> (
516    TimelyStream<G, Result<ProtoBatch, StorageErrorX>>,
517    PressOnDropButton,
518)
519where
520    G: Scope,
521{
522    let persist_location = collection_meta.persist_location.clone();
523    let shard_id = collection_meta.data_shard;
524    let collection_desc = collection_meta.relation_desc.clone();
525
526    let mut builder =
527        AsyncOperatorBuilder::new("CopyFrom-stage_batches".to_string(), scope.clone());
528
529    let (proto_batch_handle, proto_batch_stream) =
530        builder.new_output::<CapacityContainerBuilder<_>>();
531    let mut rows_handle = builder.new_input_for(rows_stream, Pipeline, &proto_batch_handle);
532
533    let shutdown = builder.build(move |caps| async move {
534        let [proto_batch_cap] = caps.try_into().unwrap();
535
536        // Open a Persist handle that we can use to stage a batch.
537        let persist_client = persist_clients
538            .open(persist_location)
539            .await
540            .expect("failed to open Persist client");
541        let persist_diagnostics = Diagnostics {
542            shard_name: collection_id.to_string(),
543            handle_purpose: "CopyFrom::stage_batches".to_string(),
544        };
545        let write_handle = persist_client
546            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
547                shard_id,
548                Arc::new(collection_desc),
549                Arc::new(UnitSchema),
550                persist_diagnostics,
551            )
552            .await
553            .expect("could not open Persist shard");
554
555        // Create a batch using the minimum timestamp since these batches will
556        // get sent back to `environmentd` and their timestamps re-written
557        // before being finally appended.
558        let lower = mz_repr::Timestamp::MIN;
559        let upper = Antichain::from_elem(lower.step_forward());
560
561        let mut batch_builder = write_handle.builder(Antichain::from_elem(lower));
562
563        while let Some(event) = rows_handle.next().await {
564            let AsyncEvent::Data(_, row_batch) = event else {
565                continue;
566            };
567
568            // Pull Rows off our stream and stage them into a Batch.
569            for maybe_row in row_batch {
570                match maybe_row {
571                    // Happy path, add the Row to our batch!
572                    Ok(row) => {
573                        let data = SourceData(Ok(row));
574                        batch_builder
575                            .add(&data, &(), &lower, &1)
576                            .await
577                            .expect("failed to add Row to batch");
578                    }
579                    // Sad path, something upstream hit an error.
580                    Err(err) => {
581                        // Clean up our in-progress batch so we don't leak data.
582                        let batch = batch_builder
583                            .finish(upper)
584                            .await
585                            .expect("failed to cleanup batch");
586                        batch.delete().await;
587
588                        // Pass on the error.
589                        proto_batch_handle
590                            .give(&proto_batch_cap, Err(err).context("stage batches"));
591                        return;
592                    }
593                }
594            }
595        }
596
597        let batch = batch_builder
598            .finish(upper)
599            .await
600            .expect("failed to create Batch");
601
602        // Turn our Batch into a ProtoBatch that will later be linked in to
603        // the shard.
604        //
605        // Note: By turning this into a ProtoBatch, the onus is now on us to
606        // cleanup the Batch if it's never linked into the shard.
607        //
608        // TODO(cf2): Make sure these batches get cleaned up if another
609        // worker encounters an error.
610        let proto_batch = batch.into_transmittable_batch();
611        proto_batch_handle.give(&proto_batch_cap, Ok(proto_batch));
612    });
613
614    (proto_batch_stream, shutdown.press_on_drop())
615}
616
617/// Render an operator that given a stream of [`ProtoBatch`]es will call our `worker_callback` to
618/// report the results upstream.
619pub fn render_completion_operator<G, F>(
620    scope: G,
621    results_stream: &TimelyStream<G, Result<ProtoBatch, StorageErrorX>>,
622    worker_callback: F,
623) where
624    G: Scope,
625    F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
626{
627    let mut builder = AsyncOperatorBuilder::new("CopyFrom-completion".to_string(), scope.clone());
628    let mut results_input = builder.new_disconnected_input(results_stream, Pipeline);
629
630    builder.build(move |_| async move {
631        let result = async move {
632            let mut maybe_payload: Option<ProtoBatch> = None;
633
634            while let Some(event) = results_input.next().await {
635                if let AsyncEvent::Data(_cap, results) = event {
636                    let [result] = results
637                        .try_into()
638                        .expect("only 1 event on the result stream");
639
640                    // TODO(cf2): Lift this restriction.
641                    if maybe_payload.is_some() {
642                        panic!("expected only one batch!");
643                    }
644
645                    maybe_payload = Some(result.map_err(|e| e.to_string())?);
646                }
647            }
648
649            Ok(maybe_payload)
650        }
651        .await;
652
653        // Report to the caller of our final status.
654        worker_callback(result);
655    });
656}
657
658/// An object that will be fetched from a [`OneshotSource`].
659pub trait OneshotObject {
660    /// Name of the object, including any extensions.
661    fn name(&self) -> &str;
662
663    /// Path of the object within the remote source.
664    fn path(&self) -> &str;
665
666    /// Size of this object in bytes.
667    fn size(&self) -> usize;
668
669    /// Encodings of the _entire_ object, if any.
670    ///
671    /// Note: The object may internally use compression, e.g. a Parquet file
672    /// could compress its column chunks, but if the Parquet file itself is not
673    /// compressed then this would return `None`.
674    fn encodings(&self) -> &[Encoding];
675}
676
677/// Encoding of a [`OneshotObject`].
678#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
679pub enum Encoding {
680    Bzip2,
681    Gzip,
682    Xz,
683    Zstd,
684}
685
686/// Defines a remote system that we can fetch data from for a "one time" ingestion.
687pub trait OneshotSource: Clone + Send + Unpin {
688    /// An individual unit within the source, e.g. a file.
689    type Object: OneshotObject
690        + Debug
691        + Clone
692        + Send
693        + Unpin
694        + Serialize
695        + DeserializeOwned
696        + 'static;
697    /// Checksum for a [`Self::Object`].
698    type Checksum: Debug + Clone + Send + Unpin + Serialize + DeserializeOwned + 'static;
699
700    /// Returns all of the objects for this source.
701    fn list<'a>(
702        &'a self,
703    ) -> impl Future<Output = Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX>> + Send;
704
705    /// Resturns a stream of the data for a specific object.
706    fn get<'s>(
707        &'s self,
708        object: Self::Object,
709        checksum: Self::Checksum,
710        range: Option<std::ops::RangeInclusive<usize>>,
711    ) -> BoxStream<'s, Result<Bytes, StorageErrorX>>;
712}
713
714/// An enum wrapper around [`OneshotSource`]s.
715///
716/// An alternative to this wrapper would be to use `Box<dyn OneshotSource>`, but that requires
717/// making the trait object safe and it's easier to just wrap it in an enum. Also, this wrapper
718/// provides a convenient place to add [`StorageErrorXContext::context`] for all of our source
719/// types.
720#[derive(Clone, Debug)]
721pub(crate) enum SourceKind {
722    Http(HttpOneshotSource),
723    AwsS3(AwsS3Source),
724}
725
726impl OneshotSource for SourceKind {
727    type Object = ObjectKind;
728    type Checksum = ChecksumKind;
729
730    async fn list<'a>(&'a self) -> Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX> {
731        match self {
732            SourceKind::Http(http) => {
733                let objects = http.list().await.context("http")?;
734                let objects = objects
735                    .into_iter()
736                    .map(|(object, checksum)| {
737                        (ObjectKind::Http(object), ChecksumKind::Http(checksum))
738                    })
739                    .collect();
740                Ok(objects)
741            }
742            SourceKind::AwsS3(s3) => {
743                let objects = s3.list().await.context("s3")?;
744                let objects = objects
745                    .into_iter()
746                    .map(|(object, checksum)| {
747                        (ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum))
748                    })
749                    .collect();
750                Ok(objects)
751            }
752        }
753    }
754
755    fn get<'s>(
756        &'s self,
757        object: Self::Object,
758        checksum: Self::Checksum,
759        range: Option<std::ops::RangeInclusive<usize>>,
760    ) -> BoxStream<'s, Result<Bytes, StorageErrorX>> {
761        match (self, object, checksum) {
762            (SourceKind::Http(http), ObjectKind::Http(object), ChecksumKind::Http(checksum)) => {
763                http.get(object, checksum, range)
764                    .map(|result| result.context("http"))
765                    .boxed()
766            }
767            (SourceKind::AwsS3(s3), ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum)) => s3
768                .get(object, checksum, range)
769                .map(|result| result.context("aws_s3"))
770                .boxed(),
771            (SourceKind::AwsS3(_) | SourceKind::Http(_), _, _) => {
772                unreachable!("programming error! wrong source, object, and checksum kind");
773            }
774        }
775    }
776}
777
778/// Enum wrapper for [`OneshotSource::Object`], see [`SourceKind`] for more details.
779#[derive(Debug, Clone, Serialize, Deserialize)]
780pub(crate) enum ObjectKind {
781    Http(HttpObject),
782    AwsS3(S3Object),
783}
784
785impl OneshotObject for ObjectKind {
786    fn name(&self) -> &str {
787        match self {
788            ObjectKind::Http(object) => object.name(),
789            ObjectKind::AwsS3(object) => object.name(),
790        }
791    }
792
793    fn path(&self) -> &str {
794        match self {
795            ObjectKind::Http(object) => object.path(),
796            ObjectKind::AwsS3(object) => object.path(),
797        }
798    }
799
800    fn size(&self) -> usize {
801        match self {
802            ObjectKind::Http(object) => object.size(),
803            ObjectKind::AwsS3(object) => object.size(),
804        }
805    }
806
807    fn encodings(&self) -> &[Encoding] {
808        match self {
809            ObjectKind::Http(object) => object.encodings(),
810            ObjectKind::AwsS3(object) => object.encodings(),
811        }
812    }
813}
814
815/// Enum wrapper for [`OneshotSource::Checksum`], see [`SourceKind`] for more details.
816#[derive(Debug, Clone, Serialize, Deserialize)]
817pub(crate) enum ChecksumKind {
818    Http(HttpChecksum),
819    AwsS3(S3Checksum),
820}
821
822/// Defines a format that we fetch for a "one time" ingestion.
823pub trait OneshotFormat: Clone {
824    /// A single unit of work for decoding this format, e.g. a single Parquet RowGroup.
825    type WorkRequest<S>: Debug + Clone + Send + Serialize + DeserializeOwned + 'static
826    where
827        S: OneshotSource;
828    /// A chunk of records in this format that can be decoded into Rows.
829    type RecordChunk: Debug + Clone + Send + Serialize + DeserializeOwned + 'static;
830
831    /// Given an upstream object, defines how we should parse this object in parallel.
832    ///
833    /// Note: It's totally fine to not process an object in parallel, and just return a single
834    /// [`Self::WorkRequest`] here.
835    fn split_work<S: OneshotSource + Send>(
836        &self,
837        source: S,
838        object: S::Object,
839        checksum: S::Checksum,
840    ) -> impl Future<Output = Result<Vec<Self::WorkRequest<S>>, StorageErrorX>> + Send;
841
842    /// Given a work request, fetch data from the [`OneshotSource`] and return it in a format that
843    /// can later be decoded.
844    fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
845        &'a self,
846        source: &'a S,
847        request: Self::WorkRequest<S>,
848    ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>>;
849
850    /// Decode a chunk of records into [`Row`]s.
851    fn decode_chunk(
852        &self,
853        chunk: Self::RecordChunk,
854        rows: &mut Vec<Row>,
855    ) -> Result<usize, StorageErrorX>;
856}
857
858/// An enum wrapper around [`OneshotFormat`]s.
859///
860/// An alternative to this wrapper would be to use `Box<dyn OneshotFormat>`, but that requires
861/// making the trait object safe and it's easier to just wrap it in an enum. Also, this wrapper
862/// provides a convenient place to add [`StorageErrorXContext::context`] for all of our format
863/// types.
864#[derive(Clone, Debug)]
865pub(crate) enum FormatKind {
866    Csv(CsvDecoder),
867    Parquet(ParquetFormat),
868}
869
870impl OneshotFormat for FormatKind {
871    type WorkRequest<S>
872        = RequestKind<S::Object, S::Checksum>
873    where
874        S: OneshotSource;
875    type RecordChunk = RecordChunkKind;
876
877    async fn split_work<S: OneshotSource + Send>(
878        &self,
879        source: S,
880        object: S::Object,
881        checksum: S::Checksum,
882    ) -> Result<Vec<Self::WorkRequest<S>>, StorageErrorX> {
883        match self {
884            FormatKind::Csv(csv) => {
885                let work = csv
886                    .split_work(source, object, checksum)
887                    .await
888                    .context("csv")?
889                    .into_iter()
890                    .map(RequestKind::Csv)
891                    .collect();
892                Ok(work)
893            }
894            FormatKind::Parquet(parquet) => {
895                let work = parquet
896                    .split_work(source, object, checksum)
897                    .await
898                    .context("parquet")?
899                    .into_iter()
900                    .map(RequestKind::Parquet)
901                    .collect();
902                Ok(work)
903            }
904        }
905    }
906
907    fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
908        &'a self,
909        source: &'a S,
910        request: Self::WorkRequest<S>,
911    ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>> {
912        match (self, request) {
913            (FormatKind::Csv(csv), RequestKind::Csv(request)) => csv
914                .fetch_work(source, request)
915                .map_ok(RecordChunkKind::Csv)
916                .map(|result| result.context("csv"))
917                .boxed(),
918            (FormatKind::Parquet(parquet), RequestKind::Parquet(request)) => parquet
919                .fetch_work(source, request)
920                .map_ok(RecordChunkKind::Parquet)
921                .map(|result| result.context("parquet"))
922                .boxed(),
923            (FormatKind::Parquet(_), RequestKind::Csv(_))
924            | (FormatKind::Csv(_), RequestKind::Parquet(_)) => {
925                unreachable!("programming error, {self:?}")
926            }
927        }
928    }
929
930    fn decode_chunk(
931        &self,
932        chunk: Self::RecordChunk,
933        rows: &mut Vec<Row>,
934    ) -> Result<usize, StorageErrorX> {
935        match (self, chunk) {
936            (FormatKind::Csv(csv), RecordChunkKind::Csv(chunk)) => {
937                csv.decode_chunk(chunk, rows).context("csv")
938            }
939            (FormatKind::Parquet(parquet), RecordChunkKind::Parquet(chunk)) => {
940                parquet.decode_chunk(chunk, rows).context("parquet")
941            }
942            (FormatKind::Parquet(_), RecordChunkKind::Csv(_))
943            | (FormatKind::Csv(_), RecordChunkKind::Parquet(_)) => {
944                unreachable!("programming error, {self:?}")
945            }
946        }
947    }
948}
949
950#[derive(Clone, Debug, Serialize, Deserialize)]
951pub(crate) enum RequestKind<O, C> {
952    Csv(CsvWorkRequest<O, C>),
953    Parquet(ParquetWorkRequest<O, C>),
954}
955
956#[derive(Clone, Debug, Serialize, Deserialize)]
957pub(crate) enum RecordChunkKind {
958    Csv(CsvRecord),
959    Parquet(ParquetRowGroup),
960}
961
962pub(crate) enum ObjectFilter {
963    None,
964    Files(BTreeSet<Box<str>>),
965    Pattern(glob::Pattern),
966}
967
968impl ObjectFilter {
969    pub fn try_new(filter: ContentFilter) -> Result<Self, anyhow::Error> {
970        match filter {
971            ContentFilter::None => Ok(ObjectFilter::None),
972            ContentFilter::Files(files) => {
973                let files = files.into_iter().map(|f| f.into()).collect();
974                Ok(ObjectFilter::Files(files))
975            }
976            ContentFilter::Pattern(pattern) => {
977                let pattern = glob::Pattern::new(&pattern)?;
978                Ok(ObjectFilter::Pattern(pattern))
979            }
980        }
981    }
982
983    /// Returns if the object should be included.
984    pub fn filter<S: OneshotSource>(&self, object: &S::Object) -> bool {
985        match self {
986            ObjectFilter::None => true,
987            ObjectFilter::Files(files) => files.contains(object.path()),
988            ObjectFilter::Pattern(pattern) => pattern.matches(object.path()),
989        }
990    }
991}
992
993/// Experimental Error Type.
994///
995/// The goal of this type is to combine concepts from both `thiserror` and
996/// `anyhow`. Having "stongly typed" errors from `thiserror` is useful for
997/// determining what action to take and tracking the context of an error like
998/// `anyhow` is useful for determining where an error came from.
999#[derive(Debug, Clone, Deserialize, Serialize)]
1000pub struct StorageErrorX {
1001    kind: StorageErrorXKind,
1002    context: LinkedList<String>,
1003}
1004
1005impl fmt::Display for StorageErrorX {
1006    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1007        writeln!(f, "error: {}", self.kind)?;
1008        writeln!(f, "causes: {:?}", self.context)?;
1009        Ok(())
1010    }
1011}
1012
1013/// Experimental Error Type, see [`StorageErrorX`].
1014#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
1015pub enum StorageErrorXKind {
1016    #[error("csv decoding error: {0}")]
1017    CsvDecoding(Arc<str>),
1018    #[error("parquet error: {0}")]
1019    ParquetError(Arc<str>),
1020    #[error("reqwest error: {0}")]
1021    Reqwest(Arc<str>),
1022    #[error("aws s3 request error: {0}")]
1023    AwsS3Request(String),
1024    #[error("aws s3 bytestream error: {0}")]
1025    AwsS3Bytes(Arc<str>),
1026    #[error("invalid reqwest header: {0}")]
1027    InvalidHeader(Arc<str>),
1028    #[error("failed to decode Row from a record batch: {0}")]
1029    InvalidRecordBatch(Arc<str>),
1030    #[error("programming error: {0}")]
1031    ProgrammingError(Arc<str>),
1032    #[error("failed to get the size of an object")]
1033    MissingSize,
1034    #[error("object is missing the required '{0}' field")]
1035    MissingField(Arc<str>),
1036    #[error("failed while evaluating the provided mfp: '{0}'")]
1037    MfpEvalError(Arc<str>),
1038    #[error("something went wrong: {0}")]
1039    Generic(String),
1040}
1041
1042impl From<csv_async::Error> for StorageErrorXKind {
1043    fn from(err: csv_async::Error) -> Self {
1044        StorageErrorXKind::CsvDecoding(err.to_string().into())
1045    }
1046}
1047
1048impl From<reqwest::Error> for StorageErrorXKind {
1049    fn from(err: reqwest::Error) -> Self {
1050        StorageErrorXKind::Reqwest(err.to_string().into())
1051    }
1052}
1053
1054impl From<reqwest::header::ToStrError> for StorageErrorXKind {
1055    fn from(err: reqwest::header::ToStrError) -> Self {
1056        StorageErrorXKind::InvalidHeader(err.to_string().into())
1057    }
1058}
1059
1060impl From<aws_smithy_types::byte_stream::error::Error> for StorageErrorXKind {
1061    fn from(err: aws_smithy_types::byte_stream::error::Error) -> Self {
1062        StorageErrorXKind::AwsS3Request(err.to_string())
1063    }
1064}
1065
1066impl From<::parquet::errors::ParquetError> for StorageErrorXKind {
1067    fn from(err: ::parquet::errors::ParquetError) -> Self {
1068        StorageErrorXKind::ParquetError(err.to_string().into())
1069    }
1070}
1071
1072impl StorageErrorXKind {
1073    pub fn with_context<C: Display>(self, context: C) -> StorageErrorX {
1074        StorageErrorX {
1075            kind: self,
1076            context: LinkedList::from([context.to_string()]),
1077        }
1078    }
1079
1080    pub fn invalid_record_batch<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1081        StorageErrorXKind::InvalidRecordBatch(error.into())
1082    }
1083
1084    pub fn generic<C: Display>(error: C) -> StorageErrorXKind {
1085        StorageErrorXKind::Generic(error.to_string())
1086    }
1087
1088    pub fn programming_error<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1089        StorageErrorXKind::ProgrammingError(error.into())
1090    }
1091}
1092
1093impl<E> From<E> for StorageErrorX
1094where
1095    E: Into<StorageErrorXKind>,
1096{
1097    fn from(err: E) -> Self {
1098        StorageErrorX {
1099            kind: err.into(),
1100            context: LinkedList::new(),
1101        }
1102    }
1103}
1104
1105trait StorageErrorXContext<T> {
1106    fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1107    where
1108        C: Display;
1109}
1110
1111impl<T, E> StorageErrorXContext<T> for Result<T, E>
1112where
1113    E: Into<StorageErrorXKind>,
1114{
1115    fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1116    where
1117        C: Display,
1118    {
1119        match self {
1120            Ok(val) => Ok(val),
1121            Err(kind) => Err(StorageErrorX {
1122                kind: kind.into(),
1123                context: LinkedList::from([context.to_string()]),
1124            }),
1125        }
1126    }
1127}
1128
1129impl<T> StorageErrorXContext<T> for Result<T, StorageErrorX> {
1130    fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1131    where
1132        C: Display,
1133    {
1134        match self {
1135            Ok(val) => Ok(val),
1136            Err(mut e) => {
1137                e.context.push_back(context.to_string());
1138                Err(e)
1139            }
1140        }
1141    }
1142}