Skip to main content

mz_storage_operators/
oneshot_source.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! "Oneshot" sources are a one-time ingestion of data from an external system, unlike traditional
11//! sources, they __do not__ run continuously. Oneshot sources are generally used for `COPY FROM`
12//! SQL statements.
13//!
14//! The implementation of reading and parsing data is behind the [`OneshotSource`] and
15//! [`OneshotFormat`] traits, respectively. Users looking to add new sources or formats, should
16//! only need to add new implementations for these traits.
17//!
18//! * [`OneshotSource`] is an interface for listing and reading from an external system, e.g. an
19//!   HTTP server.
20//! * [`OneshotFormat`] is an interface for how to parallelize and parse data, e.g. CSV.
21//!
22//! Given a [`OneshotSource`] and a [`OneshotFormat`] we build a dataflow structured like the
23//! following:
24//!
25//! ```text
26//!             ┏━━━━━━━━━━━━━━━┓
27//!             ┃    Discover   ┃
28//!             ┃    objects    ┃
29//!             ┗━━━━━━━┯━━━━━━━┛
30//!           ┌───< Distribute >───┐
31//!           │                    │
32//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
33//!     ┃  Split   ┃   ...   ┃  Split   ┃
34//!     ┃  Work 1  ┃         ┃  Work n  ┃
35//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
36//!           │                    │
37//!           ├───< Distribute >───┤
38//!           │                    │
39//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
40//!     ┃  Fetch   ┃   ...   ┃  Fetch   ┃
41//!     ┃  Work 1  ┃         ┃  Work n  ┃
42//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
43//!           │                    │
44//!           ├───< Distribute >───┤
45//!           │                    │
46//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
47//!     ┃  Decode  ┃   ...   ┃  Decode  ┃
48//!     ┃  Chunk 1 ┃         ┃  Chunk n ┃
49//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
50//!           │                    │
51//!           │                    │
52//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
53//!     ┃  Stage   ┃   ...   ┃  Stage   ┃
54//!     ┃  Batch 1 ┃         ┃  Batch n ┃
55//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
56//!           │                    │
57//!           └─────────┬──────────┘
58//!               ┏━━━━━v━━━━┓
59//!               ┃  Result  ┃
60//!               ┃ Callback ┃
61//!               ┗━━━━━━━━━━┛
62//! ```
63//!
64
65use std::fmt;
66use std::sync::Arc;
67
68use bytes::Bytes;
69use differential_dataflow::Hashable;
70use futures::stream::BoxStream;
71use futures::{StreamExt, TryStreamExt};
72use mz_expr::SafeMfpPlan;
73use mz_ore::cast::CastFrom;
74use mz_persist_client::Diagnostics;
75use mz_persist_client::batch::ProtoBatch;
76use mz_persist_client::cache::PersistClientCache;
77use mz_persist_types::Codec;
78use mz_persist_types::codec_impls::UnitSchema;
79use mz_repr::{DatumVec, GlobalId, Row, RowArena, Timestamp};
80use mz_storage_types::StorageDiff;
81use mz_storage_types::connections::ConnectionContext;
82use mz_storage_types::controller::CollectionMetadata;
83use mz_storage_types::oneshot_sources::{
84    ContentFilter, ContentFormat, ContentSource, OneshotIngestionRequest,
85};
86use mz_storage_types::sources::SourceData;
87use mz_timely_util::builder_async::{
88    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
89};
90use mz_timely_util::pact::Distribute;
91use serde::de::DeserializeOwned;
92use serde::{Deserialize, Serialize};
93use std::collections::{BTreeSet, LinkedList};
94use std::fmt::{Debug, Display};
95use std::future::Future;
96use timely::container::CapacityContainerBuilder;
97use timely::dataflow::channels::pact::Pipeline;
98use timely::dataflow::{Scope, Stream as TimelyStream};
99use timely::progress::Antichain;
100use tracing::info;
101
102use crate::oneshot_source::aws_source::{AwsS3Source, S3Checksum, S3Object};
103use crate::oneshot_source::csv::{CsvDecoder, CsvRecord, CsvWorkRequest};
104use crate::oneshot_source::http_source::{HttpChecksum, HttpObject, HttpOneshotSource};
105use crate::oneshot_source::parquet::{ParquetFormat, ParquetRowGroup, ParquetWorkRequest};
106
107pub mod csv;
108pub mod parquet;
109
110pub mod aws_source;
111pub mod http_source;
112
113mod util;
114
115/// Render a dataflow to do a "oneshot" ingestion.
116///
117/// Roughly the operators we render do the following:
118///
119/// 1. Discover objects with a [`OneshotSource`].
120/// 2. Split objects into separate units of work based on the [`OneshotFormat`].
121/// 3. Fetch individual units of work (aka fetch byte blobs) with the
122///    [`OneshotFormat`] and [`OneshotSource`].
123/// 4. Decode the fetched byte blobs into [`Row`]s.
124/// 5. Stage the [`Row`]s into Persist returning [`ProtoBatch`]es.
125///
126/// TODO(cf3): Benchmark combining operators 3, 4, and 5. Currently we keep them
127/// separate for the [`CsvDecoder`]. CSV decoding is hard to do in parallel so we
128/// currently have a single worker Fetch an entire file, and then distributes
129/// chunks for parallel Decoding. We should benchmark if this is actually faster
130/// than just a single worker both fetching and decoding.
131///
132pub fn render<G, F>(
133    scope: G,
134    persist_clients: Arc<PersistClientCache>,
135    connection_context: ConnectionContext,
136    collection_id: GlobalId,
137    collection_meta: CollectionMetadata,
138    request: OneshotIngestionRequest,
139    worker_callback: F,
140) -> Vec<PressOnDropButton>
141where
142    G: Scope<Timestamp = Timestamp>,
143    F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
144{
145    let OneshotIngestionRequest {
146        source,
147        format,
148        filter,
149        shape,
150    } = request;
151
152    let source = match source {
153        ContentSource::Http { url } => {
154            let source = HttpOneshotSource::new(reqwest::Client::default(), url);
155            SourceKind::Http(source)
156        }
157        ContentSource::AwsS3 {
158            connection,
159            connection_id,
160            uri,
161        } => {
162            let source = AwsS3Source::new(connection, connection_id, connection_context, uri);
163            SourceKind::AwsS3(source)
164        }
165    };
166    tracing::info!(?source, "created oneshot source");
167
168    let format = match format {
169        ContentFormat::Csv(params) => {
170            let format = CsvDecoder::new(params, &shape.source_desc);
171            FormatKind::Csv(format)
172        }
173        ContentFormat::Parquet => {
174            let format = ParquetFormat::new(shape.source_desc);
175            FormatKind::Parquet(format)
176        }
177    };
178
179    // Discover what objects are available to copy.
180    let (objects_stream, discover_token) =
181        render_discover_objects(scope.clone(), collection_id, source.clone(), filter);
182    // Split the objects into individual units of work.
183    let (work_stream, split_token) = render_split_work(
184        scope.clone(),
185        collection_id,
186        &objects_stream,
187        source.clone(),
188        format.clone(),
189    );
190    // Fetch each unit of work, returning chunks of records.
191    let (records_stream, fetch_token) = render_fetch_work(
192        scope.clone(),
193        collection_id,
194        source.clone(),
195        format.clone(),
196        &work_stream,
197    );
198    // Parse chunks of records into Rows.
199    let (rows_stream, decode_token) = render_decode_chunk(
200        scope.clone(),
201        format.clone(),
202        &records_stream,
203        shape.source_mfp,
204    );
205    // Stage the Rows in Persist.
206    let (batch_stream, batch_token) = render_stage_batches_operator(
207        scope.clone(),
208        collection_id,
209        &collection_meta,
210        persist_clients,
211        &rows_stream,
212    );
213
214    // Collect all results together and notify the upstream of whether or not we succeeded.
215    render_completion_operator(scope, &batch_stream, worker_callback);
216
217    let tokens = vec![
218        discover_token,
219        split_token,
220        fetch_token,
221        decode_token,
222        batch_token,
223    ];
224
225    tokens
226}
227
228/// Render an operator that using a [`OneshotSource`] will discover what objects are available
229/// for fetching.
230pub fn render_discover_objects<G, S>(
231    scope: G,
232    collection_id: GlobalId,
233    source: S,
234    filter: ContentFilter,
235) -> (
236    TimelyStream<G, Result<(S::Object, S::Checksum), StorageErrorX>>,
237    PressOnDropButton,
238)
239where
240    G: Scope<Timestamp = Timestamp>,
241    S: OneshotSource + 'static,
242{
243    // Only a single worker is responsible for discovering objects.
244    let worker_id = scope.index();
245    let num_workers = scope.peers();
246    let active_worker_id = usize::cast_from((collection_id, "discover").hashed()) % num_workers;
247    let is_active_worker = worker_id == active_worker_id;
248
249    let mut builder = AsyncOperatorBuilder::new("CopyFrom-discover".to_string(), scope.clone());
250
251    let (start_handle, start_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
252
253    let shutdown = builder.build(move |caps| async move {
254        let [start_cap] = caps.try_into().unwrap();
255
256        if !is_active_worker {
257            return;
258        }
259
260        let filter = match ObjectFilter::try_new(filter) {
261            Ok(filter) => filter,
262            Err(err) => {
263                tracing::warn!(?err, "failed to create filter");
264                start_handle.give(&start_cap, Err(StorageErrorXKind::generic(err).into()));
265                return;
266            }
267        };
268
269        let work = source.list().await.context("list");
270        match work {
271            Ok(objects) => {
272                let (include, exclude): (Vec<_>, Vec<_>) = objects
273                    .into_iter()
274                    .partition(|(o, _checksum)| filter.filter::<S>(o));
275                tracing::info!(%worker_id, ?include, ?exclude, "listed objects");
276
277                include
278                    .into_iter()
279                    .for_each(|object| start_handle.give(&start_cap, Ok(object)))
280            }
281            Err(err) => {
282                tracing::warn!(?err, "failed to list oneshot source");
283                start_handle.give(&start_cap, Err(err))
284            }
285        }
286    });
287
288    (start_stream, shutdown.press_on_drop())
289}
290
291/// Render an operator that given a stream of [`OneshotSource::Object`]s will split them into units
292/// of work based on the provided [`OneshotFormat`].
293pub fn render_split_work<G, S, F>(
294    scope: G,
295    collection_id: GlobalId,
296    objects: &TimelyStream<G, Result<(S::Object, S::Checksum), StorageErrorX>>,
297    source: S,
298    format: F,
299) -> (
300    TimelyStream<G, Result<F::WorkRequest<S>, StorageErrorX>>,
301    PressOnDropButton,
302)
303where
304    G: Scope,
305    S: OneshotSource + Send + Sync + 'static,
306    F: OneshotFormat + Send + Sync + 'static,
307{
308    let worker_id = scope.index();
309    let mut builder = AsyncOperatorBuilder::new("CopyFrom-split_work".to_string(), scope.clone());
310
311    let (request_handle, request_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
312    let mut objects_handle = builder.new_input_for(objects, Distribute, &request_handle);
313
314    let shutdown = builder.build(move |caps| async move {
315        let [_objects_cap] = caps.try_into().unwrap();
316
317        info!(%collection_id, %worker_id, "CopyFrom Split Work");
318
319        while let Some(event) = objects_handle.next().await {
320            let (capability, maybe_objects) = match event {
321                AsyncEvent::Data(cap, req) => (cap, req),
322                AsyncEvent::Progress(_) => continue,
323            };
324
325            // Nest the `split_work(...)` method in an async-block so we can use the `?`
326            // without returning from the entire operator, and so we can add context.
327            let result = async {
328                let mut requests = Vec::new();
329
330                for maybe_object in maybe_objects {
331                    // Return early if the upstream Discover step failed.
332                    let (object, checksum) = maybe_object?;
333
334                    let format_ = format.clone();
335                    let source_ = source.clone();
336                    let work_requests = mz_ore::task::spawn(|| "split-work", async move {
337                        info!(%worker_id, object = %object.name(), "splitting object");
338                        format_.split_work(source_.clone(), object, checksum).await
339                    })
340                    .await?;
341
342                    requests.extend(work_requests);
343                }
344
345                Ok::<_, StorageErrorX>(requests)
346            }
347            .await
348            .context("split");
349
350            match result {
351                Ok(requests) => requests
352                    .into_iter()
353                    .for_each(|req| request_handle.give(&capability, Ok(req))),
354                Err(err) => request_handle.give(&capability, Err(err)),
355            }
356        }
357    });
358
359    (request_stream, shutdown.press_on_drop())
360}
361
362/// Render an operator that given a stream [`OneshotFormat::WorkRequest`]s will fetch chunks of the
363/// remote [`OneshotSource::Object`] and return a stream of [`OneshotFormat::RecordChunk`]s that
364/// can be decoded into [`Row`]s.
365pub fn render_fetch_work<G, S, F>(
366    scope: G,
367    collection_id: GlobalId,
368    source: S,
369    format: F,
370    work_requests: &TimelyStream<G, Result<F::WorkRequest<S>, StorageErrorX>>,
371) -> (
372    TimelyStream<G, Result<F::RecordChunk, StorageErrorX>>,
373    PressOnDropButton,
374)
375where
376    G: Scope,
377    S: OneshotSource + Sync + 'static,
378    F: OneshotFormat + Sync + 'static,
379{
380    let worker_id = scope.index();
381    let mut builder = AsyncOperatorBuilder::new("CopyFrom-fetch_work".to_string(), scope.clone());
382
383    let (record_handle, record_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
384    let mut work_requests_handle = builder.new_input_for(work_requests, Distribute, &record_handle);
385
386    let shutdown = builder.build(move |caps| async move {
387        let [_work_cap] = caps.try_into().unwrap();
388
389        info!(%collection_id, %worker_id, "CopyFrom Fetch Work");
390
391        while let Some(event) = work_requests_handle.next().await {
392            let (capability, maybe_requests) = match event {
393                AsyncEvent::Data(cap, req) => (cap, req),
394                AsyncEvent::Progress(_) => continue,
395            };
396
397            // Wrap our work in a block to capture `?`.
398            let result = async {
399                // Process each stream of work, one at a time.
400                for maybe_request in maybe_requests {
401                    let request = maybe_request?;
402
403                    let mut work_stream = format.fetch_work(&source, request);
404                    while let Some(result) = work_stream.next().await {
405                        // Returns early and stop consuming from the stream if we hit an error.
406                        let record_chunk = result.context("fetch worker")?;
407
408                        // Note: We want to give record chunks as we receive them, because a work
409                        // request may be for an entire file.
410                        //
411                        // TODO(cf3): Investigate if some small batching here would help perf.
412                        record_handle.give(&capability, Ok(record_chunk));
413                    }
414                }
415
416                Ok::<_, StorageErrorX>(())
417            }
418            .await
419            .context("fetch work");
420
421            if let Err(err) = result {
422                tracing::warn!(?err, "failed to fetch");
423                record_handle.give(&capability, Err(err))
424            }
425        }
426    });
427
428    (record_stream, shutdown.press_on_drop())
429}
430
431/// Render an operator that given a stream of [`OneshotFormat::RecordChunk`]s will decode these
432/// chunks into a stream of [`Row`]s.
433pub fn render_decode_chunk<G, F>(
434    scope: G,
435    format: F,
436    record_chunks: &TimelyStream<G, Result<F::RecordChunk, StorageErrorX>>,
437    mfp: SafeMfpPlan,
438) -> (
439    TimelyStream<G, Result<Row, StorageErrorX>>,
440    PressOnDropButton,
441)
442where
443    G: Scope,
444    F: OneshotFormat + 'static,
445{
446    let mut builder = AsyncOperatorBuilder::new("CopyFrom-decode_chunk".to_string(), scope.clone());
447
448    let (row_handle, row_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
449    let mut record_chunk_handle = builder.new_input_for(record_chunks, Distribute, &row_handle);
450
451    let shutdown = builder.build(move |caps| async move {
452        let [_row_cap] = caps.try_into().unwrap();
453
454        let mut datum_vec = DatumVec::default();
455        let row_arena = RowArena::default();
456        let mut row_buf = Row::default();
457
458        while let Some(event) = record_chunk_handle.next().await {
459            let (capability, maybe_chunks) = match event {
460                AsyncEvent::Data(cap, data) => (cap, data),
461                AsyncEvent::Progress(_) => continue,
462            };
463
464            let result = async {
465                let mut rows = Vec::new();
466                for maybe_chunk in maybe_chunks {
467                    let chunk = maybe_chunk?;
468                    format.decode_chunk(chunk, &mut rows)?;
469                }
470                Ok::<_, StorageErrorX>(rows)
471            }
472            .await
473            .context("decode chunk");
474
475            match result {
476                Ok(rows) => {
477                    // For each row of source data, we pass it through an MFP to re-arrange column
478                    // orders and/or fill in default values for missing columns.
479                    for row in rows {
480                        let mut datums = datum_vec.borrow_with(&row);
481                        let result = mfp
482                            .evaluate_into(&mut *datums, &row_arena, &mut row_buf)
483                            .map(|row| row.cloned());
484
485                        match result {
486                            Ok(Some(row)) => row_handle.give(&capability, Ok(row)),
487                            Ok(None) => {
488                                // We only expect the provided MFP to map rows from the source data
489                                // and project default values.
490                                mz_ore::soft_panic_or_log!("oneshot source MFP filtered out data!");
491                            }
492                            Err(e) => {
493                                let err = StorageErrorXKind::MfpEvalError(e.to_string().into())
494                                    .with_context("decode");
495                                row_handle.give(&capability, Err(err))
496                            }
497                        }
498                    }
499                }
500                Err(err) => row_handle.give(&capability, Err(err)),
501            }
502        }
503    });
504
505    (row_stream, shutdown.press_on_drop())
506}
507
508/// Render an operator that given a stream of [`Row`]s will stage them in Persist and return a
509/// stream of [`ProtoBatch`]es that can later be linked into a shard.
510pub fn render_stage_batches_operator<G>(
511    scope: G,
512    collection_id: GlobalId,
513    collection_meta: &CollectionMetadata,
514    persist_clients: Arc<PersistClientCache>,
515    rows_stream: &TimelyStream<G, Result<Row, StorageErrorX>>,
516) -> (
517    TimelyStream<G, Result<ProtoBatch, StorageErrorX>>,
518    PressOnDropButton,
519)
520where
521    G: Scope,
522{
523    let persist_location = collection_meta.persist_location.clone();
524    let shard_id = collection_meta.data_shard;
525    let collection_desc = Arc::new(collection_meta.relation_desc.clone());
526
527    let mut builder =
528        AsyncOperatorBuilder::new("CopyFrom-stage_batches".to_string(), scope.clone());
529
530    let (proto_batch_handle, proto_batch_stream) =
531        builder.new_output::<CapacityContainerBuilder<_>>();
532    let mut rows_handle = builder.new_input_for(rows_stream, Pipeline, &proto_batch_handle);
533
534    let shutdown = builder.build(move |caps| async move {
535        let [proto_batch_cap] = caps.try_into().unwrap();
536
537        // Open a Persist handle that we can use to stage a batch.
538        let persist_client = persist_clients
539            .open(persist_location)
540            .await
541            .expect("failed to open Persist client");
542        let persist_diagnostics = Diagnostics {
543            shard_name: collection_id.to_string(),
544            handle_purpose: "CopyFrom::stage_batches".to_string(),
545        };
546        let write_handle = persist_client
547            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
548                shard_id,
549                Arc::clone(&collection_desc),
550                Arc::new(UnitSchema),
551                persist_diagnostics,
552            )
553            .await
554            .expect("could not open Persist shard");
555
556        // Create a batch using the minimum timestamp since these batches will
557        // get sent back to `environmentd` and their timestamps re-written
558        // before being finally appended.
559        let lower = mz_repr::Timestamp::MIN;
560        let upper = Antichain::from_elem(lower.step_forward());
561
562        let mut batch_builder = write_handle.builder(Antichain::from_elem(lower));
563
564        while let Some(event) = rows_handle.next().await {
565            let AsyncEvent::Data(_, row_batch) = event else {
566                continue;
567            };
568
569            // Pull Rows off our stream and stage them into a Batch.
570            for maybe_row in row_batch {
571                let maybe_row = maybe_row.and_then(|row| {
572                    Row::validate(&row, &*collection_desc).map_err(|e| {
573                        StorageErrorXKind::invalid_record_batch(e).with_context("stage_batches")
574                    })?;
575                    Ok(row)
576                });
577                match maybe_row {
578                    // Happy path, add the Row to our batch!
579                    Ok(row) => {
580                        let data = SourceData(Ok(row));
581                        batch_builder
582                            .add(&data, &(), &lower, &1)
583                            .await
584                            .expect("failed to add Row to batch");
585                    }
586                    // Sad path, something upstream hit an error.
587                    Err(err) => {
588                        // Clean up our in-progress batch so we don't leak data.
589                        let batch = batch_builder
590                            .finish(upper)
591                            .await
592                            .expect("failed to cleanup batch");
593                        batch.delete().await;
594
595                        // Pass on the error.
596                        proto_batch_handle
597                            .give(&proto_batch_cap, Err(err).context("stage batches"));
598                        return;
599                    }
600                }
601            }
602        }
603
604        let batch = batch_builder
605            .finish(upper)
606            .await
607            .expect("failed to create Batch");
608
609        // Turn our Batch into a ProtoBatch that will later be linked in to
610        // the shard.
611        //
612        // Note: By turning this into a ProtoBatch, the onus is now on us to
613        // cleanup the Batch if it's never linked into the shard.
614        //
615        // TODO(cf2): Make sure these batches get cleaned up if another
616        // worker encounters an error.
617        let proto_batch = batch.into_transmittable_batch();
618        proto_batch_handle.give(&proto_batch_cap, Ok(proto_batch));
619    });
620
621    (proto_batch_stream, shutdown.press_on_drop())
622}
623
624/// Render an operator that given a stream of [`ProtoBatch`]es will call our `worker_callback` to
625/// report the results upstream.
626pub fn render_completion_operator<G, F>(
627    scope: G,
628    results_stream: &TimelyStream<G, Result<ProtoBatch, StorageErrorX>>,
629    worker_callback: F,
630) where
631    G: Scope,
632    F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
633{
634    let mut builder = AsyncOperatorBuilder::new("CopyFrom-completion".to_string(), scope.clone());
635    let mut results_input = builder.new_disconnected_input(results_stream, Pipeline);
636
637    builder.build(move |_| async move {
638        let result = async move {
639            let mut maybe_payload: Option<ProtoBatch> = None;
640
641            while let Some(event) = results_input.next().await {
642                if let AsyncEvent::Data(_cap, results) = event {
643                    let [result] = results
644                        .try_into()
645                        .expect("only 1 event on the result stream");
646
647                    // TODO(cf2): Lift this restriction.
648                    if maybe_payload.is_some() {
649                        panic!("expected only one batch!");
650                    }
651
652                    maybe_payload = Some(result.map_err(|e| e.to_string())?);
653                }
654            }
655
656            Ok(maybe_payload)
657        }
658        .await;
659
660        // Report to the caller of our final status.
661        worker_callback(result);
662    });
663}
664
665/// An object that will be fetched from a [`OneshotSource`].
666pub trait OneshotObject {
667    /// Name of the object, including any extensions.
668    fn name(&self) -> &str;
669
670    /// Path of the object within the remote source.
671    fn path(&self) -> &str;
672
673    /// Size of this object in bytes.
674    fn size(&self) -> usize;
675
676    /// Encodings of the _entire_ object, if any.
677    ///
678    /// Note: The object may internally use compression, e.g. a Parquet file
679    /// could compress its column chunks, but if the Parquet file itself is not
680    /// compressed then this would return `None`.
681    fn encodings(&self) -> &[Encoding];
682}
683
684/// Encoding of a [`OneshotObject`].
685#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
686pub enum Encoding {
687    Bzip2,
688    Gzip,
689    Xz,
690    Zstd,
691}
692
693/// Defines a remote system that we can fetch data from for a "one time" ingestion.
694pub trait OneshotSource: Clone + Send + Unpin {
695    /// An individual unit within the source, e.g. a file.
696    type Object: OneshotObject
697        + Debug
698        + Clone
699        + Send
700        + Unpin
701        + Serialize
702        + DeserializeOwned
703        + 'static;
704    /// Checksum for a [`Self::Object`].
705    type Checksum: Debug + Clone + Send + Unpin + Serialize + DeserializeOwned + 'static;
706
707    /// Returns all of the objects for this source.
708    fn list<'a>(
709        &'a self,
710    ) -> impl Future<Output = Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX>> + Send;
711
712    /// Resturns a stream of the data for a specific object.
713    fn get<'s>(
714        &'s self,
715        object: Self::Object,
716        checksum: Self::Checksum,
717        range: Option<std::ops::RangeInclusive<usize>>,
718    ) -> BoxStream<'s, Result<Bytes, StorageErrorX>>;
719}
720
721/// An enum wrapper around [`OneshotSource`]s.
722///
723/// An alternative to this wrapper would be to use `Box<dyn OneshotSource>`, but that requires
724/// making the trait object safe and it's easier to just wrap it in an enum. Also, this wrapper
725/// provides a convenient place to add [`StorageErrorXContext::context`] for all of our source
726/// types.
727#[derive(Clone, Debug)]
728pub(crate) enum SourceKind {
729    Http(HttpOneshotSource),
730    AwsS3(AwsS3Source),
731}
732
733impl OneshotSource for SourceKind {
734    type Object = ObjectKind;
735    type Checksum = ChecksumKind;
736
737    async fn list<'a>(&'a self) -> Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX> {
738        match self {
739            SourceKind::Http(http) => {
740                let objects = http.list().await.context("http")?;
741                let objects = objects
742                    .into_iter()
743                    .map(|(object, checksum)| {
744                        (ObjectKind::Http(object), ChecksumKind::Http(checksum))
745                    })
746                    .collect();
747                Ok(objects)
748            }
749            SourceKind::AwsS3(s3) => {
750                let objects = s3.list().await.context("s3")?;
751                let objects = objects
752                    .into_iter()
753                    .map(|(object, checksum)| {
754                        (ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum))
755                    })
756                    .collect();
757                Ok(objects)
758            }
759        }
760    }
761
762    fn get<'s>(
763        &'s self,
764        object: Self::Object,
765        checksum: Self::Checksum,
766        range: Option<std::ops::RangeInclusive<usize>>,
767    ) -> BoxStream<'s, Result<Bytes, StorageErrorX>> {
768        match (self, object, checksum) {
769            (SourceKind::Http(http), ObjectKind::Http(object), ChecksumKind::Http(checksum)) => {
770                http.get(object, checksum, range)
771                    .map(|result| result.context("http"))
772                    .boxed()
773            }
774            (SourceKind::AwsS3(s3), ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum)) => s3
775                .get(object, checksum, range)
776                .map(|result| result.context("aws_s3"))
777                .boxed(),
778            (SourceKind::AwsS3(_) | SourceKind::Http(_), _, _) => {
779                unreachable!("programming error! wrong source, object, and checksum kind");
780            }
781        }
782    }
783}
784
785/// Enum wrapper for [`OneshotSource::Object`], see [`SourceKind`] for more details.
786#[derive(Debug, Clone, Serialize, Deserialize)]
787pub(crate) enum ObjectKind {
788    Http(HttpObject),
789    AwsS3(S3Object),
790}
791
792impl OneshotObject for ObjectKind {
793    fn name(&self) -> &str {
794        match self {
795            ObjectKind::Http(object) => object.name(),
796            ObjectKind::AwsS3(object) => object.name(),
797        }
798    }
799
800    fn path(&self) -> &str {
801        match self {
802            ObjectKind::Http(object) => object.path(),
803            ObjectKind::AwsS3(object) => object.path(),
804        }
805    }
806
807    fn size(&self) -> usize {
808        match self {
809            ObjectKind::Http(object) => object.size(),
810            ObjectKind::AwsS3(object) => object.size(),
811        }
812    }
813
814    fn encodings(&self) -> &[Encoding] {
815        match self {
816            ObjectKind::Http(object) => object.encodings(),
817            ObjectKind::AwsS3(object) => object.encodings(),
818        }
819    }
820}
821
822/// Enum wrapper for [`OneshotSource::Checksum`], see [`SourceKind`] for more details.
823#[derive(Debug, Clone, Serialize, Deserialize)]
824pub(crate) enum ChecksumKind {
825    Http(HttpChecksum),
826    AwsS3(S3Checksum),
827}
828
829/// Defines a format that we fetch for a "one time" ingestion.
830pub trait OneshotFormat: Clone {
831    /// A single unit of work for decoding this format, e.g. a single Parquet RowGroup.
832    type WorkRequest<S>: Debug + Clone + Send + Serialize + DeserializeOwned + 'static
833    where
834        S: OneshotSource;
835    /// A chunk of records in this format that can be decoded into Rows.
836    type RecordChunk: Debug + Clone + Send + Serialize + DeserializeOwned + 'static;
837
838    /// Given an upstream object, defines how we should parse this object in parallel.
839    ///
840    /// Note: It's totally fine to not process an object in parallel, and just return a single
841    /// [`Self::WorkRequest`] here.
842    fn split_work<S: OneshotSource + Send>(
843        &self,
844        source: S,
845        object: S::Object,
846        checksum: S::Checksum,
847    ) -> impl Future<Output = Result<Vec<Self::WorkRequest<S>>, StorageErrorX>> + Send;
848
849    /// Given a work request, fetch data from the [`OneshotSource`] and return it in a format that
850    /// can later be decoded.
851    fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
852        &'a self,
853        source: &'a S,
854        request: Self::WorkRequest<S>,
855    ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>>;
856
857    /// Decode a chunk of records into [`Row`]s.
858    fn decode_chunk(
859        &self,
860        chunk: Self::RecordChunk,
861        rows: &mut Vec<Row>,
862    ) -> Result<usize, StorageErrorX>;
863}
864
865/// An enum wrapper around [`OneshotFormat`]s.
866///
867/// An alternative to this wrapper would be to use `Box<dyn OneshotFormat>`, but that requires
868/// making the trait object safe and it's easier to just wrap it in an enum. Also, this wrapper
869/// provides a convenient place to add [`StorageErrorXContext::context`] for all of our format
870/// types.
871#[derive(Clone, Debug)]
872pub(crate) enum FormatKind {
873    Csv(CsvDecoder),
874    Parquet(ParquetFormat),
875}
876
877impl OneshotFormat for FormatKind {
878    type WorkRequest<S>
879        = RequestKind<S::Object, S::Checksum>
880    where
881        S: OneshotSource;
882    type RecordChunk = RecordChunkKind;
883
884    async fn split_work<S: OneshotSource + Send>(
885        &self,
886        source: S,
887        object: S::Object,
888        checksum: S::Checksum,
889    ) -> Result<Vec<Self::WorkRequest<S>>, StorageErrorX> {
890        match self {
891            FormatKind::Csv(csv) => {
892                let work = csv
893                    .split_work(source, object, checksum)
894                    .await
895                    .context("csv")?
896                    .into_iter()
897                    .map(RequestKind::Csv)
898                    .collect();
899                Ok(work)
900            }
901            FormatKind::Parquet(parquet) => {
902                let work = parquet
903                    .split_work(source, object, checksum)
904                    .await
905                    .context("parquet")?
906                    .into_iter()
907                    .map(RequestKind::Parquet)
908                    .collect();
909                Ok(work)
910            }
911        }
912    }
913
914    fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
915        &'a self,
916        source: &'a S,
917        request: Self::WorkRequest<S>,
918    ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>> {
919        match (self, request) {
920            (FormatKind::Csv(csv), RequestKind::Csv(request)) => csv
921                .fetch_work(source, request)
922                .map_ok(RecordChunkKind::Csv)
923                .map(|result| result.context("csv"))
924                .boxed(),
925            (FormatKind::Parquet(parquet), RequestKind::Parquet(request)) => parquet
926                .fetch_work(source, request)
927                .map_ok(RecordChunkKind::Parquet)
928                .map(|result| result.context("parquet"))
929                .boxed(),
930            (FormatKind::Parquet(_), RequestKind::Csv(_))
931            | (FormatKind::Csv(_), RequestKind::Parquet(_)) => {
932                unreachable!("programming error, {self:?}")
933            }
934        }
935    }
936
937    fn decode_chunk(
938        &self,
939        chunk: Self::RecordChunk,
940        rows: &mut Vec<Row>,
941    ) -> Result<usize, StorageErrorX> {
942        match (self, chunk) {
943            (FormatKind::Csv(csv), RecordChunkKind::Csv(chunk)) => {
944                csv.decode_chunk(chunk, rows).context("csv")
945            }
946            (FormatKind::Parquet(parquet), RecordChunkKind::Parquet(chunk)) => {
947                parquet.decode_chunk(chunk, rows).context("parquet")
948            }
949            (FormatKind::Parquet(_), RecordChunkKind::Csv(_))
950            | (FormatKind::Csv(_), RecordChunkKind::Parquet(_)) => {
951                unreachable!("programming error, {self:?}")
952            }
953        }
954    }
955}
956
957#[derive(Clone, Debug, Serialize, Deserialize)]
958pub(crate) enum RequestKind<O, C> {
959    Csv(CsvWorkRequest<O, C>),
960    Parquet(ParquetWorkRequest<O, C>),
961}
962
963#[derive(Clone, Debug, Serialize, Deserialize)]
964pub(crate) enum RecordChunkKind {
965    Csv(CsvRecord),
966    Parquet(ParquetRowGroup),
967}
968
969pub(crate) enum ObjectFilter {
970    None,
971    Files(BTreeSet<Box<str>>),
972    Pattern(glob::Pattern),
973}
974
975impl ObjectFilter {
976    pub fn try_new(filter: ContentFilter) -> Result<Self, anyhow::Error> {
977        match filter {
978            ContentFilter::None => Ok(ObjectFilter::None),
979            ContentFilter::Files(files) => {
980                let files = files.into_iter().map(|f| f.into()).collect();
981                Ok(ObjectFilter::Files(files))
982            }
983            ContentFilter::Pattern(pattern) => {
984                let pattern = glob::Pattern::new(&pattern)?;
985                Ok(ObjectFilter::Pattern(pattern))
986            }
987        }
988    }
989
990    /// Returns if the object should be included.
991    pub fn filter<S: OneshotSource>(&self, object: &S::Object) -> bool {
992        match self {
993            ObjectFilter::None => true,
994            ObjectFilter::Files(files) => files.contains(object.path()),
995            ObjectFilter::Pattern(pattern) => pattern.matches(object.path()),
996        }
997    }
998}
999
1000/// Experimental Error Type.
1001///
1002/// The goal of this type is to combine concepts from both `thiserror` and
1003/// `anyhow`. Having "stongly typed" errors from `thiserror` is useful for
1004/// determining what action to take and tracking the context of an error like
1005/// `anyhow` is useful for determining where an error came from.
1006#[derive(Debug, Clone, Deserialize, Serialize)]
1007pub struct StorageErrorX {
1008    kind: StorageErrorXKind,
1009    context: LinkedList<String>,
1010}
1011
1012impl fmt::Display for StorageErrorX {
1013    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1014        writeln!(f, "error: {}", self.kind)?;
1015        writeln!(f, "causes: {:?}", self.context)?;
1016        Ok(())
1017    }
1018}
1019
1020/// Experimental Error Type, see [`StorageErrorX`].
1021#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
1022pub enum StorageErrorXKind {
1023    #[error("csv decoding error: {0}")]
1024    CsvDecoding(Arc<str>),
1025    #[error("parquet error: {0}")]
1026    ParquetError(Arc<str>),
1027    #[error("reqwest error: {0}")]
1028    Reqwest(Arc<str>),
1029    #[error("aws s3 request error: {0}")]
1030    AwsS3Request(String),
1031    #[error("aws s3 bytestream error: {0}")]
1032    AwsS3Bytes(Arc<str>),
1033    #[error("invalid reqwest header: {0}")]
1034    InvalidHeader(Arc<str>),
1035    #[error("failed to decode Row from a record batch: {0}")]
1036    InvalidRecordBatch(Arc<str>),
1037    #[error("programming error: {0}")]
1038    ProgrammingError(Arc<str>),
1039    #[error("failed to get the size of an object")]
1040    MissingSize,
1041    #[error("object is missing the required '{0}' field")]
1042    MissingField(Arc<str>),
1043    #[error("failed while evaluating the provided mfp: '{0}'")]
1044    MfpEvalError(Arc<str>),
1045    #[error("something went wrong: {0}")]
1046    Generic(String),
1047}
1048
1049impl From<csv_async::Error> for StorageErrorXKind {
1050    fn from(err: csv_async::Error) -> Self {
1051        StorageErrorXKind::CsvDecoding(err.to_string().into())
1052    }
1053}
1054
1055impl From<reqwest::Error> for StorageErrorXKind {
1056    fn from(err: reqwest::Error) -> Self {
1057        StorageErrorXKind::Reqwest(err.to_string().into())
1058    }
1059}
1060
1061impl From<reqwest::header::ToStrError> for StorageErrorXKind {
1062    fn from(err: reqwest::header::ToStrError) -> Self {
1063        StorageErrorXKind::InvalidHeader(err.to_string().into())
1064    }
1065}
1066
1067impl From<aws_smithy_types::byte_stream::error::Error> for StorageErrorXKind {
1068    fn from(err: aws_smithy_types::byte_stream::error::Error) -> Self {
1069        StorageErrorXKind::AwsS3Request(err.to_string())
1070    }
1071}
1072
1073impl From<::parquet::errors::ParquetError> for StorageErrorXKind {
1074    fn from(err: ::parquet::errors::ParquetError) -> Self {
1075        StorageErrorXKind::ParquetError(err.to_string().into())
1076    }
1077}
1078
1079impl StorageErrorXKind {
1080    pub fn with_context<C: Display>(self, context: C) -> StorageErrorX {
1081        StorageErrorX {
1082            kind: self,
1083            context: LinkedList::from([context.to_string()]),
1084        }
1085    }
1086
1087    pub fn invalid_record_batch<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1088        StorageErrorXKind::InvalidRecordBatch(error.into())
1089    }
1090
1091    pub fn generic<C: Display>(error: C) -> StorageErrorXKind {
1092        StorageErrorXKind::Generic(error.to_string())
1093    }
1094
1095    pub fn programming_error<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1096        StorageErrorXKind::ProgrammingError(error.into())
1097    }
1098}
1099
1100impl<E> From<E> for StorageErrorX
1101where
1102    E: Into<StorageErrorXKind>,
1103{
1104    fn from(err: E) -> Self {
1105        StorageErrorX {
1106            kind: err.into(),
1107            context: LinkedList::new(),
1108        }
1109    }
1110}
1111
1112trait StorageErrorXContext<T> {
1113    fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1114    where
1115        C: Display;
1116}
1117
1118impl<T, E> StorageErrorXContext<T> for Result<T, E>
1119where
1120    E: Into<StorageErrorXKind>,
1121{
1122    fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1123    where
1124        C: Display,
1125    {
1126        match self {
1127            Ok(val) => Ok(val),
1128            Err(kind) => Err(StorageErrorX {
1129                kind: kind.into(),
1130                context: LinkedList::from([context.to_string()]),
1131            }),
1132        }
1133    }
1134}
1135
1136impl<T> StorageErrorXContext<T> for Result<T, StorageErrorX> {
1137    fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1138    where
1139        C: Display,
1140    {
1141        match self {
1142            Ok(val) => Ok(val),
1143            Err(mut e) => {
1144                e.context.push_back(context.to_string());
1145                Err(e)
1146            }
1147        }
1148    }
1149}