Skip to main content

mz_storage_operators/
oneshot_source.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! "Oneshot" sources are a one-time ingestion of data from an external system, unlike traditional
11//! sources, they __do not__ run continuously. Oneshot sources are generally used for `COPY FROM`
12//! SQL statements.
13//!
14//! The implementation of reading and parsing data is behind the [`OneshotSource`] and
15//! [`OneshotFormat`] traits, respectively. Users looking to add new sources or formats, should
16//! only need to add new implementations for these traits.
17//!
18//! * [`OneshotSource`] is an interface for listing and reading from an external system, e.g. an
19//!   HTTP server.
20//! * [`OneshotFormat`] is an interface for how to parallelize and parse data, e.g. CSV.
21//!
22//! Given a [`OneshotSource`] and a [`OneshotFormat`] we build a dataflow structured like the
23//! following:
24//!
25//! ```text
26//!             ┏━━━━━━━━━━━━━━━┓
27//!             ┃    Discover   ┃
28//!             ┃    objects    ┃
29//!             ┗━━━━━━━┯━━━━━━━┛
30//!           ┌───< Distribute >───┐
31//!           │                    │
32//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
33//!     ┃  Split   ┃   ...   ┃  Split   ┃
34//!     ┃  Work 1  ┃         ┃  Work n  ┃
35//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
36//!           │                    │
37//!           ├───< Distribute >───┤
38//!           │                    │
39//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
40//!     ┃  Fetch   ┃   ...   ┃  Fetch   ┃
41//!     ┃  Work 1  ┃         ┃  Work n  ┃
42//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
43//!           │                    │
44//!           ├───< Distribute >───┤
45//!           │                    │
46//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
47//!     ┃  Decode  ┃   ...   ┃  Decode  ┃
48//!     ┃  Chunk 1 ┃         ┃  Chunk n ┃
49//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
50//!           │                    │
51//!           │                    │
52//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
53//!     ┃  Stage   ┃   ...   ┃  Stage   ┃
54//!     ┃  Batch 1 ┃         ┃  Batch n ┃
55//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
56//!           │                    │
57//!           └─────────┬──────────┘
58//!               ┏━━━━━v━━━━┓
59//!               ┃  Result  ┃
60//!               ┃ Callback ┃
61//!               ┗━━━━━━━━━━┛
62//! ```
63//!
64
65use std::fmt;
66use std::sync::Arc;
67
68use bytes::Bytes;
69use differential_dataflow::Hashable;
70use futures::stream::BoxStream;
71use futures::{StreamExt, TryStreamExt};
72use mz_expr::SafeMfpPlan;
73use mz_ore::cast::CastFrom;
74use mz_persist_client::Diagnostics;
75use mz_persist_client::batch::ProtoBatch;
76use mz_persist_client::cache::PersistClientCache;
77use mz_persist_types::Codec;
78use mz_persist_types::codec_impls::UnitSchema;
79use mz_repr::{DatumVec, GlobalId, Row, RowArena, Timestamp};
80use mz_storage_types::StorageDiff;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::controller::CollectionMetadata;
83use mz_storage_types::oneshot_sources::{
84    ContentFilter, ContentFormat, ContentSource, OneshotIngestionRequest,
85};
86use mz_storage_types::sources::SourceData;
87use mz_timely_util::builder_async::{
88    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
89};
90use mz_timely_util::pact::Distribute;
91use serde::de::DeserializeOwned;
92use serde::{Deserialize, Serialize};
93use std::collections::{BTreeSet, LinkedList};
94use std::fmt::{Debug, Display};
95use std::future::Future;
96use timely::container::CapacityContainerBuilder;
97use timely::dataflow::channels::pact::Pipeline;
98use timely::dataflow::{Scope, StreamVec};
99use timely::progress::Antichain;
100use tracing::info;
101
102use crate::oneshot_source::aws_source::{AwsS3Source, S3Checksum, S3Object};
103use crate::oneshot_source::csv::{CsvDecoder, CsvRecord, CsvWorkRequest};
104use crate::oneshot_source::http_source::{HttpChecksum, HttpObject, HttpOneshotSource};
105use crate::oneshot_source::parquet::{ParquetFormat, ParquetRowGroup, ParquetWorkRequest};
106
107pub mod csv;
108pub mod parquet;
109
110pub mod aws_source;
111pub mod http_source;
112
113mod util;
114
115/// Render a dataflow to do a "oneshot" ingestion.
116///
117/// Roughly the operators we render do the following:
118///
119/// 1. Discover objects with a [`OneshotSource`].
120/// 2. Split objects into separate units of work based on the [`OneshotFormat`].
121/// 3. Fetch individual units of work (aka fetch byte blobs) with the
122///    [`OneshotFormat`] and [`OneshotSource`].
123/// 4. Decode the fetched byte blobs into [`Row`]s.
124/// 5. Stage the [`Row`]s into Persist returning [`ProtoBatch`]es.
125///
126/// TODO(cf3): Benchmark combining operators 3, 4, and 5. Currently we keep them
127/// separate for the [`CsvDecoder`]. CSV decoding is hard to do in parallel so we
128/// currently have a single worker Fetch an entire file, and then distributes
129/// chunks for parallel Decoding. We should benchmark if this is actually faster
130/// than just a single worker both fetching and decoding.
131///
132pub fn render<G, F>(
133    scope: G,
134    persist_clients: Arc<PersistClientCache>,
135    connection_context: ConnectionContext,
136    collection_id: GlobalId,
137    collection_meta: CollectionMetadata,
138    request: OneshotIngestionRequest,
139    worker_callback: F,
140) -> Vec<PressOnDropButton>
141where
142    G: Scope<Timestamp = Timestamp>,
143    F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
144{
145    let OneshotIngestionRequest {
146        source,
147        format,
148        filter,
149        shape,
150    } = request;
151
152    let source = match source {
153        ContentSource::Http { url } => {
154            let source = HttpOneshotSource::new(reqwest::Client::default(), url);
155            SourceKind::Http(source)
156        }
157        ContentSource::AwsS3 {
158            connection,
159            connection_id,
160            uri,
161        } => {
162            let source = AwsS3Source::new(connection, connection_id, connection_context, uri);
163            SourceKind::AwsS3(source)
164        }
165    };
166    tracing::info!(?source, "created oneshot source");
167
168    let format = match format {
169        ContentFormat::Csv(params) => {
170            let format = CsvDecoder::new(params, &shape.source_desc);
171            FormatKind::Csv(format)
172        }
173        ContentFormat::Parquet => {
174            let format = ParquetFormat::new(shape.source_desc);
175            FormatKind::Parquet(format)
176        }
177    };
178
179    // Discover what objects are available to copy.
180    let (objects_stream, discover_token) =
181        render_discover_objects(scope.clone(), collection_id, source.clone(), filter);
182    // Split the objects into individual units of work.
183    let (work_stream, split_token) = render_split_work(
184        scope.clone(),
185        collection_id,
186        objects_stream,
187        source.clone(),
188        format.clone(),
189    );
190    // Fetch each unit of work, returning chunks of records.
191    let (records_stream, fetch_token) = render_fetch_work(
192        scope.clone(),
193        collection_id,
194        source.clone(),
195        format.clone(),
196        work_stream,
197    );
198    // Parse chunks of records into Rows.
199    let (rows_stream, decode_token) = render_decode_chunk(
200        scope.clone(),
201        format.clone(),
202        records_stream,
203        shape.source_mfp,
204    );
205    // Stage the Rows in Persist.
206    let (batch_stream, batch_token) = render_stage_batches_operator(
207        scope.clone(),
208        collection_id,
209        &collection_meta,
210        persist_clients,
211        rows_stream,
212    );
213
214    // Collect all results together and notify the upstream of whether or not we succeeded.
215    render_completion_operator(scope, batch_stream, worker_callback);
216
217    let tokens = vec![
218        discover_token,
219        split_token,
220        fetch_token,
221        decode_token,
222        batch_token,
223    ];
224
225    tokens
226}
227
228/// Render an operator that using a [`OneshotSource`] will discover what objects are available
229/// for fetching.
230pub fn render_discover_objects<G, S>(
231    scope: G,
232    collection_id: GlobalId,
233    source: S,
234    filter: ContentFilter,
235) -> (
236    StreamVec<G, Result<(S::Object, S::Checksum), StorageErrorX>>,
237    PressOnDropButton,
238)
239where
240    G: Scope<Timestamp = Timestamp>,
241    S: OneshotSource + 'static,
242{
243    // Only a single worker is responsible for discovering objects.
244    let worker_id = scope.index();
245    let num_workers = scope.peers();
246    let active_worker_id = usize::cast_from((collection_id, "discover").hashed()) % num_workers;
247    let is_active_worker = worker_id == active_worker_id;
248
249    let mut builder = AsyncOperatorBuilder::new("CopyFrom-discover".to_string(), scope.clone());
250
251    let (start_handle, start_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
252
253    let shutdown = builder.build(move |caps| async move {
254        let [start_cap] = caps.try_into().unwrap();
255
256        if !is_active_worker {
257            return;
258        }
259
260        let filter = match ObjectFilter::try_new(filter) {
261            Ok(filter) => filter,
262            Err(err) => {
263                tracing::warn!(?err, "failed to create filter");
264                start_handle.give(&start_cap, Err(StorageErrorXKind::generic(err).into()));
265                return;
266            }
267        };
268
269        let work = source.list().await.context("list");
270        match work {
271            Ok(objects) => {
272                let (include, exclude): (Vec<_>, Vec<_>) = objects
273                    .into_iter()
274                    .partition(|(o, _checksum)| filter.filter::<S>(o));
275                tracing::info!(%worker_id, ?include, ?exclude, "listed objects");
276                if include.is_empty() {
277                    let err = StorageErrorXKind::NoMatchingFiles.with_context("discover");
278                    start_handle.give(&start_cap, Err(err));
279                    return;
280                }
281                include
282                    .into_iter()
283                    .for_each(|object| start_handle.give(&start_cap, Ok(object)))
284            }
285            Err(err) => {
286                tracing::warn!(?err, "failed to list oneshot source");
287                start_handle.give(&start_cap, Err(err))
288            }
289        }
290    });
291
292    (start_stream, shutdown.press_on_drop())
293}
294
295/// Render an operator that given a stream of [`OneshotSource::Object`]s will split them into units
296/// of work based on the provided [`OneshotFormat`].
297pub fn render_split_work<G, S, F>(
298    scope: G,
299    collection_id: GlobalId,
300    objects: StreamVec<G, Result<(S::Object, S::Checksum), StorageErrorX>>,
301    source: S,
302    format: F,
303) -> (
304    StreamVec<G, Result<F::WorkRequest<S>, StorageErrorX>>,
305    PressOnDropButton,
306)
307where
308    G: Scope,
309    S: OneshotSource + Send + Sync + 'static,
310    F: OneshotFormat + Send + Sync + 'static,
311{
312    let worker_id = scope.index();
313    let mut builder = AsyncOperatorBuilder::new("CopyFrom-split_work".to_string(), scope.clone());
314
315    let (request_handle, request_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
316    let mut objects_handle = builder.new_input_for(objects, Distribute, &request_handle);
317
318    let shutdown = builder.build(move |caps| async move {
319        let [_objects_cap] = caps.try_into().unwrap();
320
321        info!(%collection_id, %worker_id, "CopyFrom Split Work");
322
323        while let Some(event) = objects_handle.next().await {
324            let (capability, maybe_objects) = match event {
325                AsyncEvent::Data(cap, req) => (cap, req),
326                AsyncEvent::Progress(_) => continue,
327            };
328
329            // Nest the `split_work(...)` method in an async-block so we can use the `?`
330            // without returning from the entire operator, and so we can add context.
331            let result = async {
332                let mut requests = Vec::new();
333
334                for maybe_object in maybe_objects {
335                    // Return early if the upstream Discover step failed.
336                    let (object, checksum) = maybe_object?;
337
338                    let format_ = format.clone();
339                    let source_ = source.clone();
340                    let work_requests = mz_ore::task::spawn(|| "split-work", async move {
341                        info!(%worker_id, object = %object.name(), "splitting object");
342                        format_.split_work(source_.clone(), object, checksum).await
343                    })
344                    .await?;
345
346                    requests.extend(work_requests);
347                }
348
349                Ok::<_, StorageErrorX>(requests)
350            }
351            .await
352            .context("split");
353
354            match result {
355                Ok(requests) => requests
356                    .into_iter()
357                    .for_each(|req| request_handle.give(&capability, Ok(req))),
358                Err(err) => request_handle.give(&capability, Err(err)),
359            }
360        }
361    });
362
363    (request_stream, shutdown.press_on_drop())
364}
365
366/// Render an operator that given a stream [`OneshotFormat::WorkRequest`]s will fetch chunks of the
367/// remote [`OneshotSource::Object`] and return a stream of [`OneshotFormat::RecordChunk`]s that
368/// can be decoded into [`Row`]s.
369pub fn render_fetch_work<G, S, F>(
370    scope: G,
371    collection_id: GlobalId,
372    source: S,
373    format: F,
374    work_requests: StreamVec<G, Result<F::WorkRequest<S>, StorageErrorX>>,
375) -> (
376    StreamVec<G, Result<F::RecordChunk, StorageErrorX>>,
377    PressOnDropButton,
378)
379where
380    G: Scope,
381    S: OneshotSource + Sync + 'static,
382    F: OneshotFormat + Sync + 'static,
383{
384    let worker_id = scope.index();
385    let mut builder = AsyncOperatorBuilder::new("CopyFrom-fetch_work".to_string(), scope.clone());
386
387    let (record_handle, record_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
388    let mut work_requests_handle = builder.new_input_for(work_requests, Distribute, &record_handle);
389
390    let shutdown = builder.build(move |caps| async move {
391        let [_work_cap] = caps.try_into().unwrap();
392
393        info!(%collection_id, %worker_id, "CopyFrom Fetch Work");
394
395        while let Some(event) = work_requests_handle.next().await {
396            let (capability, maybe_requests) = match event {
397                AsyncEvent::Data(cap, req) => (cap, req),
398                AsyncEvent::Progress(_) => continue,
399            };
400
401            // Wrap our work in a block to capture `?`.
402            let result = async {
403                // Process each stream of work, one at a time.
404                for maybe_request in maybe_requests {
405                    let request = maybe_request?;
406
407                    let mut work_stream = format.fetch_work(&source, request);
408                    while let Some(result) = work_stream.next().await {
409                        // Returns early and stop consuming from the stream if we hit an error.
410                        let record_chunk = result.context("fetch worker")?;
411
412                        // Note: We want to give record chunks as we receive them, because a work
413                        // request may be for an entire file.
414                        //
415                        // TODO(cf3): Investigate if some small batching here would help perf.
416                        record_handle.give(&capability, Ok(record_chunk));
417                    }
418                }
419
420                Ok::<_, StorageErrorX>(())
421            }
422            .await
423            .context("fetch work");
424
425            if let Err(err) = result {
426                tracing::warn!(?err, "failed to fetch");
427                record_handle.give(&capability, Err(err))
428            }
429        }
430    });
431
432    (record_stream, shutdown.press_on_drop())
433}
434
435/// Render an operator that given a stream of [`OneshotFormat::RecordChunk`]s will decode these
436/// chunks into a stream of [`Row`]s.
437pub fn render_decode_chunk<G, F>(
438    scope: G,
439    format: F,
440    record_chunks: StreamVec<G, Result<F::RecordChunk, StorageErrorX>>,
441    mfp: SafeMfpPlan,
442) -> (StreamVec<G, Result<Row, StorageErrorX>>, PressOnDropButton)
443where
444    G: Scope,
445    F: OneshotFormat + 'static,
446{
447    let mut builder = AsyncOperatorBuilder::new("CopyFrom-decode_chunk".to_string(), scope.clone());
448
449    let (row_handle, row_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
450    let mut record_chunk_handle = builder.new_input_for(record_chunks, Distribute, &row_handle);
451
452    let shutdown = builder.build(move |caps| async move {
453        let [_row_cap] = caps.try_into().unwrap();
454
455        let mut datum_vec = DatumVec::default();
456        let row_arena = RowArena::default();
457        let mut row_buf = Row::default();
458
459        while let Some(event) = record_chunk_handle.next().await {
460            let (capability, maybe_chunks) = match event {
461                AsyncEvent::Data(cap, data) => (cap, data),
462                AsyncEvent::Progress(_) => continue,
463            };
464
465            let result = async {
466                let mut rows = Vec::new();
467                for maybe_chunk in maybe_chunks {
468                    let chunk = maybe_chunk?;
469                    format.decode_chunk(chunk, &mut rows)?;
470                }
471                Ok::<_, StorageErrorX>(rows)
472            }
473            .await
474            .context("decode chunk");
475
476            match result {
477                Ok(rows) => {
478                    // For each row of source data, we pass it through an MFP to re-arrange column
479                    // orders and/or fill in default values for missing columns.
480                    for row in rows {
481                        let mut datums = datum_vec.borrow_with(&row);
482                        let result = mfp
483                            .evaluate_into(&mut *datums, &row_arena, &mut row_buf)
484                            .map(|row| row.cloned());
485
486                        match result {
487                            Ok(Some(row)) => row_handle.give(&capability, Ok(row)),
488                            Ok(None) => {
489                                // We only expect the provided MFP to map rows from the source data
490                                // and project default values.
491                                mz_ore::soft_panic_or_log!("oneshot source MFP filtered out data!");
492                            }
493                            Err(e) => {
494                                let err = StorageErrorXKind::MfpEvalError(e.to_string().into())
495                                    .with_context("decode");
496                                row_handle.give(&capability, Err(err))
497                            }
498                        }
499                    }
500                }
501                Err(err) => row_handle.give(&capability, Err(err)),
502            }
503        }
504    });
505
506    (row_stream, shutdown.press_on_drop())
507}
508
509/// Render an operator that given a stream of [`Row`]s will stage them in Persist and return a
510/// stream of [`ProtoBatch`]es that can later be linked into a shard.
511pub fn render_stage_batches_operator<G>(
512    scope: G,
513    collection_id: GlobalId,
514    collection_meta: &CollectionMetadata,
515    persist_clients: Arc<PersistClientCache>,
516    rows_stream: StreamVec<G, Result<Row, StorageErrorX>>,
517) -> (
518    StreamVec<G, Result<ProtoBatch, StorageErrorX>>,
519    PressOnDropButton,
520)
521where
522    G: Scope,
523{
524    let persist_location = collection_meta.persist_location.clone();
525    let shard_id = collection_meta.data_shard;
526    let collection_desc = Arc::new(collection_meta.relation_desc.clone());
527
528    let mut builder =
529        AsyncOperatorBuilder::new("CopyFrom-stage_batches".to_string(), scope.clone());
530
531    let (proto_batch_handle, proto_batch_stream) =
532        builder.new_output::<CapacityContainerBuilder<_>>();
533    let mut rows_handle = builder.new_input_for(rows_stream, Pipeline, &proto_batch_handle);
534
535    let shutdown = builder.build(move |caps| async move {
536        let [proto_batch_cap] = caps.try_into().unwrap();
537
538        // Open a Persist handle that we can use to stage a batch.
539        let persist_client = persist_clients
540            .open(persist_location)
541            .await
542            .expect("failed to open Persist client");
543        let persist_diagnostics = Diagnostics {
544            shard_name: collection_id.to_string(),
545            handle_purpose: "CopyFrom::stage_batches".to_string(),
546        };
547        let write_handle = persist_client
548            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
549                shard_id,
550                Arc::clone(&collection_desc),
551                Arc::new(UnitSchema),
552                persist_diagnostics,
553            )
554            .await
555            .expect("could not open Persist shard");
556
557        // Create a batch using the minimum timestamp since these batches will
558        // get sent back to `environmentd` and their timestamps re-written
559        // before being finally appended.
560        let lower = mz_repr::Timestamp::MIN;
561        let upper = Antichain::from_elem(lower.step_forward());
562
563        let mut batch_builder = write_handle.builder(Antichain::from_elem(lower));
564
565        while let Some(event) = rows_handle.next().await {
566            let AsyncEvent::Data(_, row_batch) = event else {
567                continue;
568            };
569
570            // Pull Rows off our stream and stage them into a Batch.
571            for maybe_row in row_batch {
572                let maybe_row = maybe_row.and_then(|row| {
573                    Row::validate(&row, &*collection_desc).map_err(|e| {
574                        StorageErrorXKind::invalid_record_batch(e).with_context("stage_batches")
575                    })?;
576                    Ok(row)
577                });
578                match maybe_row {
579                    // Happy path, add the Row to our batch!
580                    Ok(row) => {
581                        let data = SourceData(Ok(row));
582                        batch_builder
583                            .add(&data, &(), &lower, &1)
584                            .await
585                            .expect("failed to add Row to batch");
586                    }
587                    // Sad path, something upstream hit an error.
588                    Err(err) => {
589                        // Clean up our in-progress batch so we don't leak data.
590                        let batch = batch_builder
591                            .finish(upper)
592                            .await
593                            .expect("failed to cleanup batch");
594                        batch.delete().await;
595
596                        // Pass on the error.
597                        proto_batch_handle
598                            .give(&proto_batch_cap, Err(err).context("stage batches"));
599                        return;
600                    }
601                }
602            }
603        }
604
605        let batch = batch_builder
606            .finish(upper)
607            .await
608            .expect("failed to create Batch");
609
610        // Turn our Batch into a ProtoBatch that will later be linked in to
611        // the shard.
612        //
613        // Note: By turning this into a ProtoBatch, the onus is now on us to
614        // cleanup the Batch if it's never linked into the shard.
615        //
616        // TODO(cf2): Make sure these batches get cleaned up if another
617        // worker encounters an error.
618        let proto_batch = batch.into_transmittable_batch();
619        proto_batch_handle.give(&proto_batch_cap, Ok(proto_batch));
620    });
621
622    (proto_batch_stream, shutdown.press_on_drop())
623}
624
625/// Render an operator that given a stream of [`ProtoBatch`]es will call our `worker_callback` to
626/// report the results upstream.
627pub fn render_completion_operator<G, F>(
628    scope: G,
629    results_stream: StreamVec<G, Result<ProtoBatch, StorageErrorX>>,
630    worker_callback: F,
631) where
632    G: Scope,
633    F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
634{
635    let mut builder = AsyncOperatorBuilder::new("CopyFrom-completion".to_string(), scope.clone());
636    let mut results_input = builder.new_disconnected_input(results_stream, Pipeline);
637
638    builder.build(move |_| async move {
639        let result = async move {
640            let mut maybe_payload: Option<ProtoBatch> = None;
641
642            while let Some(event) = results_input.next().await {
643                if let AsyncEvent::Data(_cap, results) = event {
644                    let [result] = results
645                        .try_into()
646                        .expect("only 1 event on the result stream");
647
648                    // TODO(cf2): Lift this restriction.
649                    if maybe_payload.is_some() {
650                        panic!("expected only one batch!");
651                    }
652
653                    maybe_payload = Some(result.map_err(|e| e.to_string())?);
654                }
655            }
656
657            Ok(maybe_payload)
658        }
659        .await;
660
661        // Report to the caller of our final status.
662        worker_callback(result);
663    });
664}
665
666/// An object that will be fetched from a [`OneshotSource`].
667pub trait OneshotObject {
668    /// Name of the object, including any extensions.
669    fn name(&self) -> &str;
670
671    /// Path of the object within the remote source.
672    fn path(&self) -> &str;
673
674    /// Size of this object in bytes.
675    fn size(&self) -> usize;
676
677    /// Encodings of the _entire_ object, if any.
678    ///
679    /// Note: The object may internally use compression, e.g. a Parquet file
680    /// could compress its column chunks, but if the Parquet file itself is not
681    /// compressed then this would return `None`.
682    fn encodings(&self) -> &[Encoding];
683}
684
685/// Encoding of a [`OneshotObject`].
686#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
687pub enum Encoding {
688    Bzip2,
689    Gzip,
690    Xz,
691    Zstd,
692}
693
694/// Defines a remote system that we can fetch data from for a "one time" ingestion.
695pub trait OneshotSource: Clone + Send + Unpin {
696    /// An individual unit within the source, e.g. a file.
697    type Object: OneshotObject
698        + Debug
699        + Clone
700        + Send
701        + Unpin
702        + Serialize
703        + DeserializeOwned
704        + 'static;
705    /// Checksum for a [`Self::Object`].
706    type Checksum: Debug + Clone + Send + Unpin + Serialize + DeserializeOwned + 'static;
707
708    /// Returns all of the objects for this source.
709    fn list<'a>(
710        &'a self,
711    ) -> impl Future<Output = Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX>> + Send;
712
713    /// Resturns a stream of the data for a specific object.
714    fn get<'s>(
715        &'s self,
716        object: Self::Object,
717        checksum: Self::Checksum,
718        range: Option<std::ops::RangeInclusive<usize>>,
719    ) -> BoxStream<'s, Result<Bytes, StorageErrorX>>;
720}
721
722/// An enum wrapper around [`OneshotSource`]s.
723///
724/// An alternative to this wrapper would be to use `Box<dyn OneshotSource>`, but that requires
725/// making the trait object safe and it's easier to just wrap it in an enum. Also, this wrapper
726/// provides a convenient place to add [`StorageErrorXContext::context`] for all of our source
727/// types.
728#[derive(Clone, Debug)]
729pub(crate) enum SourceKind {
730    Http(HttpOneshotSource),
731    AwsS3(AwsS3Source),
732}
733
734impl OneshotSource for SourceKind {
735    type Object = ObjectKind;
736    type Checksum = ChecksumKind;
737
738    async fn list<'a>(&'a self) -> Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX> {
739        match self {
740            SourceKind::Http(http) => {
741                let objects = http.list().await.context("http")?;
742                let objects = objects
743                    .into_iter()
744                    .map(|(object, checksum)| {
745                        (ObjectKind::Http(object), ChecksumKind::Http(checksum))
746                    })
747                    .collect();
748                Ok(objects)
749            }
750            SourceKind::AwsS3(s3) => {
751                let objects = s3.list().await.context("s3")?;
752                let objects = objects
753                    .into_iter()
754                    .map(|(object, checksum)| {
755                        (ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum))
756                    })
757                    .collect();
758                Ok(objects)
759            }
760        }
761    }
762
763    fn get<'s>(
764        &'s self,
765        object: Self::Object,
766        checksum: Self::Checksum,
767        range: Option<std::ops::RangeInclusive<usize>>,
768    ) -> BoxStream<'s, Result<Bytes, StorageErrorX>> {
769        match (self, object, checksum) {
770            (SourceKind::Http(http), ObjectKind::Http(object), ChecksumKind::Http(checksum)) => {
771                http.get(object, checksum, range)
772                    .map(|result| result.context("http"))
773                    .boxed()
774            }
775            (SourceKind::AwsS3(s3), ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum)) => s3
776                .get(object, checksum, range)
777                .map(|result| result.context("aws_s3"))
778                .boxed(),
779            (SourceKind::AwsS3(_) | SourceKind::Http(_), _, _) => {
780                unreachable!("programming error! wrong source, object, and checksum kind");
781            }
782        }
783    }
784}
785
786/// Enum wrapper for [`OneshotSource::Object`], see [`SourceKind`] for more details.
787#[derive(Debug, Clone, Serialize, Deserialize)]
788pub(crate) enum ObjectKind {
789    Http(HttpObject),
790    AwsS3(S3Object),
791}
792
793impl OneshotObject for ObjectKind {
794    fn name(&self) -> &str {
795        match self {
796            ObjectKind::Http(object) => object.name(),
797            ObjectKind::AwsS3(object) => object.name(),
798        }
799    }
800
801    fn path(&self) -> &str {
802        match self {
803            ObjectKind::Http(object) => object.path(),
804            ObjectKind::AwsS3(object) => object.path(),
805        }
806    }
807
808    fn size(&self) -> usize {
809        match self {
810            ObjectKind::Http(object) => object.size(),
811            ObjectKind::AwsS3(object) => object.size(),
812        }
813    }
814
815    fn encodings(&self) -> &[Encoding] {
816        match self {
817            ObjectKind::Http(object) => object.encodings(),
818            ObjectKind::AwsS3(object) => object.encodings(),
819        }
820    }
821}
822
823/// Enum wrapper for [`OneshotSource::Checksum`], see [`SourceKind`] for more details.
824#[derive(Debug, Clone, Serialize, Deserialize)]
825pub(crate) enum ChecksumKind {
826    Http(HttpChecksum),
827    AwsS3(S3Checksum),
828}
829
830/// Defines a format that we fetch for a "one time" ingestion.
831pub trait OneshotFormat: Clone {
832    /// A single unit of work for decoding this format, e.g. a single Parquet RowGroup.
833    type WorkRequest<S>: Debug + Clone + Send + Serialize + DeserializeOwned + 'static
834    where
835        S: OneshotSource;
836    /// A chunk of records in this format that can be decoded into Rows.
837    type RecordChunk: Debug + Clone + Send + Serialize + DeserializeOwned + 'static;
838
839    /// Given an upstream object, defines how we should parse this object in parallel.
840    ///
841    /// Note: It's totally fine to not process an object in parallel, and just return a single
842    /// [`Self::WorkRequest`] here.
843    fn split_work<S: OneshotSource + Send>(
844        &self,
845        source: S,
846        object: S::Object,
847        checksum: S::Checksum,
848    ) -> impl Future<Output = Result<Vec<Self::WorkRequest<S>>, StorageErrorX>> + Send;
849
850    /// Given a work request, fetch data from the [`OneshotSource`] and return it in a format that
851    /// can later be decoded.
852    fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
853        &'a self,
854        source: &'a S,
855        request: Self::WorkRequest<S>,
856    ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>>;
857
858    /// Decode a chunk of records into [`Row`]s.
859    fn decode_chunk(
860        &self,
861        chunk: Self::RecordChunk,
862        rows: &mut Vec<Row>,
863    ) -> Result<usize, StorageErrorX>;
864}
865
866/// An enum wrapper around [`OneshotFormat`]s.
867///
868/// An alternative to this wrapper would be to use `Box<dyn OneshotFormat>`, but that requires
869/// making the trait object safe and it's easier to just wrap it in an enum. Also, this wrapper
870/// provides a convenient place to add [`StorageErrorXContext::context`] for all of our format
871/// types.
872#[derive(Clone, Debug)]
873pub(crate) enum FormatKind {
874    Csv(CsvDecoder),
875    Parquet(ParquetFormat),
876}
877
878impl OneshotFormat for FormatKind {
879    type WorkRequest<S>
880        = RequestKind<S::Object, S::Checksum>
881    where
882        S: OneshotSource;
883    type RecordChunk = RecordChunkKind;
884
885    async fn split_work<S: OneshotSource + Send>(
886        &self,
887        source: S,
888        object: S::Object,
889        checksum: S::Checksum,
890    ) -> Result<Vec<Self::WorkRequest<S>>, StorageErrorX> {
891        match self {
892            FormatKind::Csv(csv) => {
893                let work = csv
894                    .split_work(source, object, checksum)
895                    .await
896                    .context("csv")?
897                    .into_iter()
898                    .map(RequestKind::Csv)
899                    .collect();
900                Ok(work)
901            }
902            FormatKind::Parquet(parquet) => {
903                let work = parquet
904                    .split_work(source, object, checksum)
905                    .await
906                    .context("parquet")?
907                    .into_iter()
908                    .map(RequestKind::Parquet)
909                    .collect();
910                Ok(work)
911            }
912        }
913    }
914
915    fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
916        &'a self,
917        source: &'a S,
918        request: Self::WorkRequest<S>,
919    ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>> {
920        match (self, request) {
921            (FormatKind::Csv(csv), RequestKind::Csv(request)) => csv
922                .fetch_work(source, request)
923                .map_ok(RecordChunkKind::Csv)
924                .map(|result| result.context("csv"))
925                .boxed(),
926            (FormatKind::Parquet(parquet), RequestKind::Parquet(request)) => parquet
927                .fetch_work(source, request)
928                .map_ok(RecordChunkKind::Parquet)
929                .map(|result| result.context("parquet"))
930                .boxed(),
931            (FormatKind::Parquet(_), RequestKind::Csv(_))
932            | (FormatKind::Csv(_), RequestKind::Parquet(_)) => {
933                unreachable!("programming error, {self:?}")
934            }
935        }
936    }
937
938    fn decode_chunk(
939        &self,
940        chunk: Self::RecordChunk,
941        rows: &mut Vec<Row>,
942    ) -> Result<usize, StorageErrorX> {
943        match (self, chunk) {
944            (FormatKind::Csv(csv), RecordChunkKind::Csv(chunk)) => {
945                csv.decode_chunk(chunk, rows).context("csv")
946            }
947            (FormatKind::Parquet(parquet), RecordChunkKind::Parquet(chunk)) => {
948                parquet.decode_chunk(chunk, rows).context("parquet")
949            }
950            (FormatKind::Parquet(_), RecordChunkKind::Csv(_))
951            | (FormatKind::Csv(_), RecordChunkKind::Parquet(_)) => {
952                unreachable!("programming error, {self:?}")
953            }
954        }
955    }
956}
957
958#[derive(Clone, Debug, Serialize, Deserialize)]
959pub(crate) enum RequestKind<O, C> {
960    Csv(CsvWorkRequest<O, C>),
961    Parquet(ParquetWorkRequest<O, C>),
962}
963
964#[derive(Clone, Debug, Serialize, Deserialize)]
965pub(crate) enum RecordChunkKind {
966    Csv(CsvRecord),
967    Parquet(ParquetRowGroup),
968}
969
970pub(crate) enum ObjectFilter {
971    None,
972    Files(BTreeSet<Box<str>>),
973    Pattern(glob::Pattern),
974}
975
976impl ObjectFilter {
977    pub fn try_new(filter: ContentFilter) -> Result<Self, anyhow::Error> {
978        match filter {
979            ContentFilter::None => Ok(ObjectFilter::None),
980            ContentFilter::Files(files) => {
981                let files = files.into_iter().map(|f| f.into()).collect();
982                Ok(ObjectFilter::Files(files))
983            }
984            ContentFilter::Pattern(pattern) => {
985                let pattern = glob::Pattern::new(&pattern)?;
986                Ok(ObjectFilter::Pattern(pattern))
987            }
988        }
989    }
990
991    /// Returns if the object should be included.
992    pub fn filter<S: OneshotSource>(&self, object: &S::Object) -> bool {
993        match self {
994            ObjectFilter::None => true,
995            ObjectFilter::Files(files) => files.contains(object.path()),
996            ObjectFilter::Pattern(pattern) => pattern.matches(object.path()),
997        }
998    }
999}
1000
1001/// Experimental Error Type.
1002///
1003/// The goal of this type is to combine concepts from both `thiserror` and
1004/// `anyhow`. Having "stongly typed" errors from `thiserror` is useful for
1005/// determining what action to take and tracking the context of an error like
1006/// `anyhow` is useful for determining where an error came from.
1007#[derive(Debug, Clone, Deserialize, Serialize)]
1008pub struct StorageErrorX {
1009    kind: StorageErrorXKind,
1010    context: LinkedList<String>,
1011}
1012
1013impl fmt::Display for StorageErrorX {
1014    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1015        writeln!(f, "error: {}", self.kind)?;
1016        writeln!(f, "causes: {:?}", self.context)?;
1017        Ok(())
1018    }
1019}
1020
1021/// Experimental Error Type, see [`StorageErrorX`].
1022#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
1023pub enum StorageErrorXKind {
1024    #[error("csv decoding error: {0}")]
1025    CsvDecoding(Arc<str>),
1026    #[error("parquet error: {0}")]
1027    ParquetError(Arc<str>),
1028    #[error("reqwest error: {0}")]
1029    Reqwest(Arc<str>),
1030    #[error("aws s3 request error: {0}")]
1031    AwsS3Request(String),
1032    #[error("aws s3 bytestream error: {0}")]
1033    AwsS3Bytes(Arc<str>),
1034    #[error("invalid reqwest header: {0}")]
1035    InvalidHeader(Arc<str>),
1036    #[error("failed to decode Row from a record batch: {0}")]
1037    InvalidRecordBatch(Arc<str>),
1038    #[error("programming error: {0}")]
1039    ProgrammingError(Arc<str>),
1040    #[error("failed to get the size of an object")]
1041    MissingSize,
1042    #[error("object is missing the required '{0}' field")]
1043    MissingField(Arc<str>),
1044    #[error("failed while evaluating the provided mfp: '{0}'")]
1045    MfpEvalError(Arc<str>),
1046    #[error("no matching files found at the given url")]
1047    NoMatchingFiles,
1048    #[error("something went wrong: {0}")]
1049    Generic(String),
1050}
1051
1052impl From<csv_async::Error> for StorageErrorXKind {
1053    fn from(err: csv_async::Error) -> Self {
1054        StorageErrorXKind::CsvDecoding(err.to_string().into())
1055    }
1056}
1057
1058impl From<reqwest::Error> for StorageErrorXKind {
1059    fn from(err: reqwest::Error) -> Self {
1060        StorageErrorXKind::Reqwest(err.to_string().into())
1061    }
1062}
1063
1064impl From<reqwest::header::ToStrError> for StorageErrorXKind {
1065    fn from(err: reqwest::header::ToStrError) -> Self {
1066        StorageErrorXKind::InvalidHeader(err.to_string().into())
1067    }
1068}
1069
1070impl From<aws_smithy_types::byte_stream::error::Error> for StorageErrorXKind {
1071    fn from(err: aws_smithy_types::byte_stream::error::Error) -> Self {
1072        StorageErrorXKind::AwsS3Request(err.to_string())
1073    }
1074}
1075
1076impl From<::parquet::errors::ParquetError> for StorageErrorXKind {
1077    fn from(err: ::parquet::errors::ParquetError) -> Self {
1078        StorageErrorXKind::ParquetError(err.to_string().into())
1079    }
1080}
1081
1082impl StorageErrorXKind {
1083    pub fn with_context<C: Display>(self, context: C) -> StorageErrorX {
1084        StorageErrorX {
1085            kind: self,
1086            context: LinkedList::from([context.to_string()]),
1087        }
1088    }
1089
1090    pub fn invalid_record_batch<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1091        StorageErrorXKind::InvalidRecordBatch(error.into())
1092    }
1093
1094    pub fn generic<C: Display>(error: C) -> StorageErrorXKind {
1095        StorageErrorXKind::Generic(error.to_string())
1096    }
1097
1098    pub fn programming_error<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1099        StorageErrorXKind::ProgrammingError(error.into())
1100    }
1101}
1102
1103impl<E> From<E> for StorageErrorX
1104where
1105    E: Into<StorageErrorXKind>,
1106{
1107    fn from(err: E) -> Self {
1108        StorageErrorX {
1109            kind: err.into(),
1110            context: LinkedList::new(),
1111        }
1112    }
1113}
1114
1115trait StorageErrorXContext<T> {
1116    fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1117    where
1118        C: Display;
1119}
1120
1121impl<T, E> StorageErrorXContext<T> for Result<T, E>
1122where
1123    E: Into<StorageErrorXKind>,
1124{
1125    fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1126    where
1127        C: Display,
1128    {
1129        match self {
1130            Ok(val) => Ok(val),
1131            Err(kind) => Err(StorageErrorX {
1132                kind: kind.into(),
1133                context: LinkedList::from([context.to_string()]),
1134            }),
1135        }
1136    }
1137}
1138
1139impl<T> StorageErrorXContext<T> for Result<T, StorageErrorX> {
1140    fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1141    where
1142        C: Display,
1143    {
1144        match self {
1145            Ok(val) => Ok(val),
1146            Err(mut e) => {
1147                e.context.push_back(context.to_string());
1148                Err(e)
1149            }
1150        }
1151    }
1152}