Skip to main content

mz_storage_operators/
oneshot_source.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! "Oneshot" sources are a one-time ingestion of data from an external system, unlike traditional
11//! sources, they __do not__ run continuously. Oneshot sources are generally used for `COPY FROM`
12//! SQL statements.
13//!
14//! The implementation of reading and parsing data is behind the [`OneshotSource`] and
15//! [`OneshotFormat`] traits, respectively. Users looking to add new sources or formats, should
16//! only need to add new implementations for these traits.
17//!
18//! * [`OneshotSource`] is an interface for listing and reading from an external system, e.g. an
19//!   HTTP server.
20//! * [`OneshotFormat`] is an interface for how to parallelize and parse data, e.g. CSV.
21//!
22//! Given a [`OneshotSource`] and a [`OneshotFormat`] we build a dataflow structured like the
23//! following:
24//!
25//! ```text
26//!             ┏━━━━━━━━━━━━━━━┓
27//!             ┃    Discover   ┃
28//!             ┃    objects    ┃
29//!             ┗━━━━━━━┯━━━━━━━┛
30//!           ┌───< Distribute >───┐
31//!           │                    │
32//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
33//!     ┃  Split   ┃   ...   ┃  Split   ┃
34//!     ┃  Work 1  ┃         ┃  Work n  ┃
35//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
36//!           │                    │
37//!           ├───< Distribute >───┤
38//!           │                    │
39//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
40//!     ┃  Fetch   ┃   ...   ┃  Fetch   ┃
41//!     ┃  Work 1  ┃         ┃  Work n  ┃
42//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
43//!           │                    │
44//!           ├───< Distribute >───┤
45//!           │                    │
46//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
47//!     ┃  Decode  ┃   ...   ┃  Decode  ┃
48//!     ┃  Chunk 1 ┃         ┃  Chunk n ┃
49//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
50//!           │                    │
51//!           │                    │
52//!     ┏━━━━━v━━━━┓         ┏━━━━━v━━━━┓
53//!     ┃  Stage   ┃   ...   ┃  Stage   ┃
54//!     ┃  Batch 1 ┃         ┃  Batch n ┃
55//!     ┗━━━━━┯━━━━┛         ┗━━━━━┯━━━━┛
56//!           │                    │
57//!           └─────────┬──────────┘
58//!               ┏━━━━━v━━━━┓
59//!               ┃  Result  ┃
60//!               ┃ Callback ┃
61//!               ┗━━━━━━━━━━┛
62//! ```
63//!
64
65use std::fmt;
66use std::sync::Arc;
67
68use aws_sdk_s3::error::DisplayErrorContext;
69use bytes::Bytes;
70use differential_dataflow::Hashable;
71use futures::stream::BoxStream;
72use futures::{StreamExt, TryStreamExt};
73use mz_expr::SafeMfpPlan;
74use mz_ore::cast::CastFrom;
75use mz_ore::error::ErrorExt;
76use mz_persist_client::Diagnostics;
77use mz_persist_client::batch::ProtoBatch;
78use mz_persist_client::cache::PersistClientCache;
79use mz_persist_types::Codec;
80use mz_persist_types::codec_impls::UnitSchema;
81use mz_repr::{DatumVec, GlobalId, Row, RowArena, Timestamp};
82use mz_storage_types::StorageDiff;
83use mz_storage_types::connections::ConnectionContext;
84use mz_storage_types::controller::CollectionMetadata;
85use mz_storage_types::oneshot_sources::{
86    ContentFilter, ContentFormat, ContentSource, OneshotIngestionRequest,
87};
88use mz_storage_types::sources::SourceData;
89use mz_timely_util::builder_async::{
90    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
91};
92use mz_timely_util::pact::Distribute;
93use serde::de::DeserializeOwned;
94use serde::{Deserialize, Serialize};
95use std::collections::{BTreeSet, LinkedList};
96use std::fmt::{Debug, Display};
97use std::future::Future;
98use timely::container::CapacityContainerBuilder;
99use timely::dataflow::channels::pact::Pipeline;
100use timely::dataflow::{Scope, StreamVec};
101use timely::progress::Antichain;
102use tracing::info;
103
104use crate::oneshot_source::aws_source::{AwsS3Source, S3Checksum, S3Object};
105use crate::oneshot_source::csv::{CsvDecoder, CsvRecord, CsvWorkRequest};
106use crate::oneshot_source::http_source::{
107    HttpChecksum, HttpObject, HttpOneshotSource, build_http_client,
108};
109use crate::oneshot_source::parquet::{ParquetFormat, ParquetRowGroup, ParquetWorkRequest};
110
111pub mod csv;
112pub mod parquet;
113
114pub mod aws_source;
115pub mod http_source;
116
117mod util;
118
119/// Render a dataflow to do a "oneshot" ingestion.
120///
121/// Roughly the operators we render do the following:
122///
123/// 1. Discover objects with a [`OneshotSource`].
124/// 2. Split objects into separate units of work based on the [`OneshotFormat`].
125/// 3. Fetch individual units of work (aka fetch byte blobs) with the
126///    [`OneshotFormat`] and [`OneshotSource`].
127/// 4. Decode the fetched byte blobs into [`Row`]s.
128/// 5. Stage the [`Row`]s into Persist returning [`ProtoBatch`]es.
129///
130/// TODO(cf3): Benchmark combining operators 3, 4, and 5. Currently we keep them
131/// separate for the [`CsvDecoder`]. CSV decoding is hard to do in parallel so we
132/// currently have a single worker Fetch an entire file, and then distributes
133/// chunks for parallel Decoding. We should benchmark if this is actually faster
134/// than just a single worker both fetching and decoding.
135///
136pub fn render<'scope, F>(
137    scope: Scope<'scope, Timestamp>,
138    persist_clients: Arc<PersistClientCache>,
139    connection_context: ConnectionContext,
140    collection_id: GlobalId,
141    collection_meta: CollectionMetadata,
142    request: OneshotIngestionRequest,
143    enforce_external_addresses: bool,
144    worker_callback: F,
145) -> Vec<PressOnDropButton>
146where
147    F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
148{
149    let OneshotIngestionRequest {
150        source,
151        format,
152        filter,
153        shape,
154    } = request;
155
156    let source = match source {
157        ContentSource::Http { url } => {
158            let client = match build_http_client(enforce_external_addresses) {
159                Ok(client) => client,
160                Err(err) => {
161                    worker_callback(Err(format!(
162                        "failed to build HTTP client for COPY FROM: {err}"
163                    )));
164                    return Vec::new();
165                }
166            };
167            let source = HttpOneshotSource::new(client, url);
168            SourceKind::Http(source)
169        }
170        ContentSource::AwsS3 {
171            connection,
172            connection_id,
173            uri,
174        } => {
175            // Checksum validation does not work with GCS when using ranges, which happens with parquet.
176            // So, we disable checksum if both the endpoint is overridden to a non-AWS endpoint and the format is Parquet.
177            let use_checksum = !(connection.endpoint.is_some()
178                && !connection.endpoint.as_ref().unwrap().contains("amazonaws"))
179                || format != ContentFormat::Parquet;
180            if !use_checksum {
181                tracing::info!(
182                    "disabling checksum validation for S3 source because endpoint: {:?} is overridden and format is Parquet",
183                    connection.endpoint
184                );
185            }
186            let source = AwsS3Source::new(
187                connection,
188                connection_id,
189                connection_context,
190                uri,
191                use_checksum,
192                enforce_external_addresses,
193            );
194            SourceKind::AwsS3(source)
195        }
196    };
197    tracing::info!(?source, "created oneshot source");
198
199    let format = match format {
200        ContentFormat::Csv(params) => {
201            let format = CsvDecoder::new(params, &shape.source_desc);
202            FormatKind::Csv(format)
203        }
204        ContentFormat::Parquet => {
205            let format = ParquetFormat::new(shape.source_desc);
206            FormatKind::Parquet(format)
207        }
208    };
209
210    // Discover what objects are available to copy.
211    let (objects_stream, discover_token) =
212        render_discover_objects(scope.clone(), collection_id, source.clone(), filter);
213    // Split the objects into individual units of work.
214    let (work_stream, split_token) = render_split_work(
215        scope.clone(),
216        collection_id,
217        objects_stream,
218        source.clone(),
219        format.clone(),
220    );
221    // Fetch each unit of work, returning chunks of records.
222    let (records_stream, fetch_token) = render_fetch_work(
223        scope.clone(),
224        collection_id,
225        source.clone(),
226        format.clone(),
227        work_stream,
228    );
229    // Parse chunks of records into Rows.
230    let (rows_stream, decode_token) = render_decode_chunk(
231        scope.clone(),
232        format.clone(),
233        records_stream,
234        shape.source_mfp,
235    );
236    // Stage the Rows in Persist.
237    let (batch_stream, batch_token) = render_stage_batches_operator(
238        scope.clone(),
239        collection_id,
240        &collection_meta,
241        persist_clients,
242        rows_stream,
243    );
244
245    // Collect all results together and notify the upstream of whether or not we succeeded.
246    render_completion_operator(scope, batch_stream, worker_callback);
247
248    let tokens = vec![
249        discover_token,
250        split_token,
251        fetch_token,
252        decode_token,
253        batch_token,
254    ];
255
256    tokens
257}
258
259/// Render an operator that using a [`OneshotSource`] will discover what objects are available
260/// for fetching.
261pub fn render_discover_objects<'scope, S>(
262    scope: Scope<'scope, Timestamp>,
263    collection_id: GlobalId,
264    source: S,
265    filter: ContentFilter,
266) -> (
267    StreamVec<'scope, Timestamp, Result<(S::Object, S::Checksum), StorageErrorX>>,
268    PressOnDropButton,
269)
270where
271    S: OneshotSource + 'static,
272{
273    // Only a single worker is responsible for discovering objects.
274    let worker_id = scope.index();
275    let num_workers = scope.peers();
276    let active_worker_id = usize::cast_from((collection_id, "discover").hashed()) % num_workers;
277    let is_active_worker = worker_id == active_worker_id;
278
279    let mut builder = AsyncOperatorBuilder::new("CopyFrom-discover".to_string(), scope.clone());
280
281    let (start_handle, start_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
282
283    let shutdown = builder.build(move |caps| async move {
284        let [start_cap] = caps.try_into().unwrap();
285
286        if !is_active_worker {
287            return;
288        }
289
290        let filter = match ObjectFilter::try_new(filter) {
291            Ok(filter) => filter,
292            Err(err) => {
293                tracing::warn!(?err, "failed to create filter");
294                start_handle.give(&start_cap, Err(StorageErrorXKind::generic(err).into()));
295                return;
296            }
297        };
298
299        let work = source.list().await.context("list");
300        match work {
301            Ok(objects) => {
302                let (include, exclude): (Vec<_>, Vec<_>) = objects
303                    .into_iter()
304                    .partition(|(o, _checksum)| filter.filter::<S>(o));
305                tracing::info!(%worker_id, ?include, ?exclude, "listed objects");
306                if include.is_empty() {
307                    let err = StorageErrorXKind::NoMatchingFiles.with_context("discover");
308                    start_handle.give(&start_cap, Err(err));
309                    return;
310                }
311                include
312                    .into_iter()
313                    .for_each(|object| start_handle.give(&start_cap, Ok(object)))
314            }
315            Err(err) => {
316                tracing::warn!(?err, "failed to list oneshot source");
317                start_handle.give(&start_cap, Err(err))
318            }
319        }
320    });
321
322    (start_stream, shutdown.press_on_drop())
323}
324
325/// Render an operator that given a stream of [`OneshotSource::Object`]s will split them into units
326/// of work based on the provided [`OneshotFormat`].
327pub fn render_split_work<'scope, T, S, F>(
328    scope: Scope<'scope, T>,
329    collection_id: GlobalId,
330    objects: StreamVec<'scope, T, Result<(S::Object, S::Checksum), StorageErrorX>>,
331    source: S,
332    format: F,
333) -> (
334    StreamVec<'scope, T, Result<F::WorkRequest<S>, StorageErrorX>>,
335    PressOnDropButton,
336)
337where
338    T: timely::progress::Timestamp,
339    S: OneshotSource + Send + Sync + 'static,
340    F: OneshotFormat + Send + Sync + 'static,
341{
342    let worker_id = scope.index();
343    let mut builder = AsyncOperatorBuilder::new("CopyFrom-split_work".to_string(), scope.clone());
344
345    let (request_handle, request_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
346    let mut objects_handle = builder.new_input_for(objects, Distribute, &request_handle);
347
348    let shutdown = builder.build(move |caps| async move {
349        let [_objects_cap] = caps.try_into().unwrap();
350
351        info!(%collection_id, %worker_id, "CopyFrom Split Work");
352
353        while let Some(event) = objects_handle.next().await {
354            let (capability, maybe_objects) = match event {
355                AsyncEvent::Data(cap, req) => (cap, req),
356                AsyncEvent::Progress(_) => continue,
357            };
358
359            // Nest the `split_work(...)` method in an async-block so we can use the `?`
360            // without returning from the entire operator, and so we can add context.
361            let result = async {
362                let mut requests = Vec::new();
363
364                for maybe_object in maybe_objects {
365                    // Return early if the upstream Discover step failed.
366                    let (object, checksum) = maybe_object?;
367
368                    let format_ = format.clone();
369                    let source_ = source.clone();
370                    let work_requests = mz_ore::task::spawn(|| "split-work", async move {
371                        info!(%worker_id, object = %object.name(), "splitting object");
372                        format_.split_work(source_.clone(), object, checksum).await
373                    })
374                    .await?;
375
376                    requests.extend(work_requests);
377                }
378
379                Ok::<_, StorageErrorX>(requests)
380            }
381            .await
382            .context("split");
383
384            match result {
385                Ok(requests) => requests
386                    .into_iter()
387                    .for_each(|req| request_handle.give(&capability, Ok(req))),
388                Err(err) => request_handle.give(&capability, Err(err)),
389            }
390        }
391    });
392
393    (request_stream, shutdown.press_on_drop())
394}
395
396/// Render an operator that given a stream [`OneshotFormat::WorkRequest`]s will fetch chunks of the
397/// remote [`OneshotSource::Object`] and return a stream of [`OneshotFormat::RecordChunk`]s that
398/// can be decoded into [`Row`]s.
399pub fn render_fetch_work<'scope, T, S, F>(
400    scope: Scope<'scope, T>,
401    collection_id: GlobalId,
402    source: S,
403    format: F,
404    work_requests: StreamVec<'scope, T, Result<F::WorkRequest<S>, StorageErrorX>>,
405) -> (
406    StreamVec<'scope, T, Result<F::RecordChunk, StorageErrorX>>,
407    PressOnDropButton,
408)
409where
410    T: timely::progress::Timestamp,
411    S: OneshotSource + Sync + 'static,
412    F: OneshotFormat + Sync + 'static,
413{
414    let worker_id = scope.index();
415    let mut builder = AsyncOperatorBuilder::new("CopyFrom-fetch_work".to_string(), scope.clone());
416
417    let (record_handle, record_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
418    let mut work_requests_handle = builder.new_input_for(work_requests, Distribute, &record_handle);
419
420    let shutdown = builder.build(move |caps| async move {
421        let [_work_cap] = caps.try_into().unwrap();
422
423        info!(%collection_id, %worker_id, "CopyFrom Fetch Work");
424
425        while let Some(event) = work_requests_handle.next().await {
426            let (capability, maybe_requests) = match event {
427                AsyncEvent::Data(cap, req) => (cap, req),
428                AsyncEvent::Progress(_) => continue,
429            };
430
431            // Wrap our work in a block to capture `?`.
432            let result = async {
433                // Process each stream of work, one at a time.
434                for maybe_request in maybe_requests {
435                    let request = maybe_request?;
436
437                    let mut work_stream = format.fetch_work(&source, request);
438                    while let Some(result) = work_stream.next().await {
439                        // Returns early and stop consuming from the stream if we hit an error.
440                        let record_chunk = result.context("fetch worker")?;
441
442                        // Note: We want to give record chunks as we receive them, because a work
443                        // request may be for an entire file.
444                        //
445                        // TODO(cf3): Investigate if some small batching here would help perf.
446                        record_handle.give(&capability, Ok(record_chunk));
447                    }
448                }
449
450                Ok::<_, StorageErrorX>(())
451            }
452            .await
453            .context("fetch work");
454
455            if let Err(err) = result {
456                tracing::warn!(?err, "failed to fetch");
457                record_handle.give(&capability, Err(err))
458            }
459        }
460    });
461
462    (record_stream, shutdown.press_on_drop())
463}
464
465/// Render an operator that given a stream of [`OneshotFormat::RecordChunk`]s will decode these
466/// chunks into a stream of [`Row`]s.
467pub fn render_decode_chunk<'scope, T, F>(
468    scope: Scope<'scope, T>,
469    format: F,
470    record_chunks: StreamVec<'scope, T, Result<F::RecordChunk, StorageErrorX>>,
471    mfp: SafeMfpPlan,
472) -> (
473    StreamVec<'scope, T, Result<Row, StorageErrorX>>,
474    PressOnDropButton,
475)
476where
477    T: timely::progress::Timestamp,
478    F: OneshotFormat + 'static,
479{
480    let mut builder = AsyncOperatorBuilder::new("CopyFrom-decode_chunk".to_string(), scope.clone());
481
482    let (row_handle, row_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
483    let mut record_chunk_handle = builder.new_input_for(record_chunks, Distribute, &row_handle);
484
485    let shutdown = builder.build(move |caps| async move {
486        let [_row_cap] = caps.try_into().unwrap();
487
488        let mut datum_vec = DatumVec::default();
489        let mut row_buf = Row::default();
490
491        while let Some(event) = record_chunk_handle.next().await {
492            let (capability, maybe_chunks) = match event {
493                AsyncEvent::Data(cap, data) => (cap, data),
494                AsyncEvent::Progress(_) => continue,
495            };
496
497            let result = async {
498                let mut rows = Vec::new();
499                for maybe_chunk in maybe_chunks {
500                    let chunk = maybe_chunk?;
501                    format.decode_chunk(chunk, &mut rows)?;
502                }
503                Ok::<_, StorageErrorX>(rows)
504            }
505            .await
506            .context("decode chunk");
507
508            match result {
509                Ok(rows) => {
510                    // Scoped to this chunk so the arena's retained capacity does not outlive it,
511                    // and cleared per row below so it only ever holds one row's MFP temporaries
512                    // rather than accumulating the whole source.
513                    let mut row_arena = RowArena::new();
514                    // For each row of source data, we pass it through an MFP to re-arrange column
515                    // orders and/or fill in default values for missing columns.
516                    for row in rows {
517                        row_arena.clear();
518                        let mut datums = datum_vec.borrow_with(&row);
519                        let result = mfp
520                            .evaluate_into(&mut *datums, &row_arena, &mut row_buf)
521                            .map(|row| row.cloned());
522
523                        match result {
524                            Ok(Some(row)) => row_handle.give(&capability, Ok(row)),
525                            Ok(None) => {
526                                // We only expect the provided MFP to map rows from the source data
527                                // and project default values.
528                                mz_ore::soft_panic_or_log!("oneshot source MFP filtered out data!");
529                            }
530                            Err(e) => {
531                                let err = StorageErrorXKind::MfpEvalError(e.to_string().into())
532                                    .with_context("decode");
533                                row_handle.give(&capability, Err(err))
534                            }
535                        }
536                    }
537                }
538                Err(err) => row_handle.give(&capability, Err(err)),
539            }
540        }
541    });
542
543    (row_stream, shutdown.press_on_drop())
544}
545
546/// Render an operator that given a stream of [`Row`]s will stage them in Persist and return a
547/// stream of [`ProtoBatch`]es that can later be linked into a shard.
548pub fn render_stage_batches_operator<'scope, T>(
549    scope: Scope<'scope, T>,
550    collection_id: GlobalId,
551    collection_meta: &CollectionMetadata,
552    persist_clients: Arc<PersistClientCache>,
553    rows_stream: StreamVec<'scope, T, Result<Row, StorageErrorX>>,
554) -> (
555    StreamVec<'scope, T, Result<ProtoBatch, StorageErrorX>>,
556    PressOnDropButton,
557)
558where
559    T: timely::progress::Timestamp,
560{
561    let persist_location = collection_meta.persist_location.clone();
562    let shard_id = collection_meta.data_shard;
563    let collection_desc = Arc::new(collection_meta.relation_desc.clone());
564
565    let mut builder =
566        AsyncOperatorBuilder::new("CopyFrom-stage_batches".to_string(), scope.clone());
567
568    let (proto_batch_handle, proto_batch_stream) =
569        builder.new_output::<CapacityContainerBuilder<_>>();
570    let mut rows_handle = builder.new_input_for(rows_stream, Pipeline, &proto_batch_handle);
571
572    let shutdown = builder.build(move |caps| async move {
573        let [proto_batch_cap] = caps.try_into().unwrap();
574
575        // Open a Persist handle that we can use to stage a batch.
576        let persist_client = persist_clients
577            .open(persist_location)
578            .await
579            .expect("failed to open Persist client");
580        let persist_diagnostics = Diagnostics {
581            shard_name: collection_id.to_string(),
582            handle_purpose: "CopyFrom::stage_batches".to_string(),
583        };
584        let write_handle = persist_client
585            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
586                shard_id,
587                Arc::clone(&collection_desc),
588                Arc::new(UnitSchema),
589                persist_diagnostics,
590            )
591            .await
592            .expect("could not open Persist shard");
593
594        // Create a batch using the minimum timestamp since these batches will
595        // get sent back to `environmentd` and their timestamps re-written
596        // before being finally appended.
597        let lower = mz_repr::Timestamp::MIN;
598        let upper = Antichain::from_elem(lower.step_forward());
599
600        let mut batch_builder = write_handle.builder(Antichain::from_elem(lower));
601
602        while let Some(event) = rows_handle.next().await {
603            let AsyncEvent::Data(_, row_batch) = event else {
604                continue;
605            };
606
607            // Pull Rows off our stream and stage them into a Batch.
608            for maybe_row in row_batch {
609                let maybe_row = maybe_row.and_then(|row| {
610                    Row::validate(&row, &*collection_desc).map_err(|e| {
611                        StorageErrorXKind::invalid_record_batch(e).with_context("stage_batches")
612                    })?;
613                    Ok(row)
614                });
615                match maybe_row {
616                    // Happy path, add the Row to our batch!
617                    Ok(row) => {
618                        let data = SourceData(Ok(row));
619                        batch_builder
620                            .add(&data, &(), &lower, &1)
621                            .await
622                            .expect("failed to add Row to batch");
623                    }
624                    // Sad path, something upstream hit an error.
625                    Err(err) => {
626                        // Clean up our in-progress batch so we don't leak data.
627                        let batch = batch_builder
628                            .finish(upper)
629                            .await
630                            .expect("failed to cleanup batch");
631                        batch.delete().await;
632
633                        // Pass on the error.
634                        proto_batch_handle
635                            .give(&proto_batch_cap, Err(err).context("stage batches"));
636                        return;
637                    }
638                }
639            }
640        }
641
642        let batch = batch_builder
643            .finish(upper)
644            .await
645            .expect("failed to create Batch");
646
647        // Turn our Batch into a ProtoBatch that will later be linked in to
648        // the shard.
649        //
650        // Note: By turning this into a ProtoBatch, the onus is now on us to
651        // cleanup the Batch if it's never linked into the shard.
652        //
653        // TODO(cf2): Make sure these batches get cleaned up if another
654        // worker encounters an error.
655        let proto_batch = batch.into_transmittable_batch();
656        proto_batch_handle.give(&proto_batch_cap, Ok(proto_batch));
657    });
658
659    (proto_batch_stream, shutdown.press_on_drop())
660}
661
662/// Render an operator that given a stream of [`ProtoBatch`]es will call our `worker_callback` to
663/// report the results upstream.
664pub fn render_completion_operator<'scope, T, F>(
665    scope: Scope<'scope, T>,
666    results_stream: StreamVec<'scope, T, Result<ProtoBatch, StorageErrorX>>,
667    worker_callback: F,
668) where
669    T: timely::progress::Timestamp,
670    F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
671{
672    let mut builder = AsyncOperatorBuilder::new("CopyFrom-completion".to_string(), scope.clone());
673    let mut results_input = builder.new_disconnected_input(results_stream, Pipeline);
674
675    builder.build(move |_| async move {
676        let result = async move {
677            let mut maybe_payload: Option<ProtoBatch> = None;
678
679            while let Some(event) = results_input.next().await {
680                if let AsyncEvent::Data(_cap, results) = event {
681                    let [result] = results
682                        .try_into()
683                        .expect("only 1 event on the result stream");
684
685                    // TODO(cf2): Lift this restriction.
686                    if maybe_payload.is_some() {
687                        panic!("expected only one batch!");
688                    }
689
690                    maybe_payload = Some(result.map_err(|e| e.to_string())?);
691                }
692            }
693
694            Ok(maybe_payload)
695        }
696        .await;
697
698        // Report to the caller of our final status.
699        worker_callback(result);
700    });
701}
702
703/// An object that will be fetched from a [`OneshotSource`].
704pub trait OneshotObject {
705    /// Name of the object, including any extensions.
706    fn name(&self) -> &str;
707
708    /// Path of the object within the remote source.
709    fn path(&self) -> &str;
710
711    /// Size of this object in bytes.
712    fn size(&self) -> usize;
713
714    /// Encodings of the _entire_ object, if any.
715    ///
716    /// Note: The object may internally use compression, e.g. a Parquet file
717    /// could compress its column chunks, but if the Parquet file itself is not
718    /// compressed then this would return `None`.
719    fn encodings(&self) -> &[Encoding];
720}
721
722/// Encoding of a [`OneshotObject`].
723#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
724pub enum Encoding {
725    Bzip2,
726    Gzip,
727    Xz,
728    Zstd,
729}
730
731/// Defines a remote system that we can fetch data from for a "one time" ingestion.
732pub trait OneshotSource: Clone + Send + Unpin {
733    /// An individual unit within the source, e.g. a file.
734    type Object: OneshotObject
735        + Debug
736        + Clone
737        + Send
738        + Unpin
739        + Serialize
740        + DeserializeOwned
741        + 'static;
742    /// Checksum for a [`Self::Object`].
743    type Checksum: Debug + Clone + Send + Unpin + Serialize + DeserializeOwned + 'static;
744
745    /// Returns all of the objects for this source.
746    fn list<'a>(
747        &'a self,
748    ) -> impl Future<Output = Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX>> + Send;
749
750    /// Resturns a stream of the data for a specific object.
751    fn get<'s>(
752        &'s self,
753        object: Self::Object,
754        checksum: Self::Checksum,
755        range: Option<std::ops::RangeInclusive<usize>>,
756    ) -> BoxStream<'s, Result<Bytes, StorageErrorX>>;
757}
758
759/// An enum wrapper around [`OneshotSource`]s.
760///
761/// An alternative to this wrapper would be to use `Box<dyn OneshotSource>`, but that requires
762/// making the trait object safe and it's easier to just wrap it in an enum. Also, this wrapper
763/// provides a convenient place to add [`StorageErrorXContext::context`] for all of our source
764/// types.
765#[derive(Clone, Debug)]
766pub(crate) enum SourceKind {
767    Http(HttpOneshotSource),
768    AwsS3(AwsS3Source),
769}
770
771impl OneshotSource for SourceKind {
772    type Object = ObjectKind;
773    type Checksum = ChecksumKind;
774
775    async fn list<'a>(&'a self) -> Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX> {
776        match self {
777            SourceKind::Http(http) => {
778                let objects = http.list().await.context("http")?;
779                let objects = objects
780                    .into_iter()
781                    .map(|(object, checksum)| {
782                        (ObjectKind::Http(object), ChecksumKind::Http(checksum))
783                    })
784                    .collect();
785                Ok(objects)
786            }
787            SourceKind::AwsS3(s3) => {
788                let objects = s3.list().await.context("s3")?;
789                let objects = objects
790                    .into_iter()
791                    .map(|(object, checksum)| {
792                        (ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum))
793                    })
794                    .collect();
795                Ok(objects)
796            }
797        }
798    }
799
800    fn get<'s>(
801        &'s self,
802        object: Self::Object,
803        checksum: Self::Checksum,
804        range: Option<std::ops::RangeInclusive<usize>>,
805    ) -> BoxStream<'s, Result<Bytes, StorageErrorX>> {
806        match (self, object, checksum) {
807            (SourceKind::Http(http), ObjectKind::Http(object), ChecksumKind::Http(checksum)) => {
808                http.get(object, checksum, range)
809                    .map(|result| result.context("http"))
810                    .boxed()
811            }
812            (SourceKind::AwsS3(s3), ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum)) => s3
813                .get(object, checksum, range)
814                .map(|result| result.context("aws_s3"))
815                .boxed(),
816            (SourceKind::AwsS3(_) | SourceKind::Http(_), _, _) => {
817                unreachable!("programming error! wrong source, object, and checksum kind");
818            }
819        }
820    }
821}
822
823/// Enum wrapper for [`OneshotSource::Object`], see [`SourceKind`] for more details.
824#[derive(Debug, Clone, Serialize, Deserialize)]
825pub(crate) enum ObjectKind {
826    Http(HttpObject),
827    AwsS3(S3Object),
828}
829
830impl OneshotObject for ObjectKind {
831    fn name(&self) -> &str {
832        match self {
833            ObjectKind::Http(object) => object.name(),
834            ObjectKind::AwsS3(object) => object.name(),
835        }
836    }
837
838    fn path(&self) -> &str {
839        match self {
840            ObjectKind::Http(object) => object.path(),
841            ObjectKind::AwsS3(object) => object.path(),
842        }
843    }
844
845    fn size(&self) -> usize {
846        match self {
847            ObjectKind::Http(object) => object.size(),
848            ObjectKind::AwsS3(object) => object.size(),
849        }
850    }
851
852    fn encodings(&self) -> &[Encoding] {
853        match self {
854            ObjectKind::Http(object) => object.encodings(),
855            ObjectKind::AwsS3(object) => object.encodings(),
856        }
857    }
858}
859
860/// Enum wrapper for [`OneshotSource::Checksum`], see [`SourceKind`] for more details.
861#[derive(Debug, Clone, Serialize, Deserialize)]
862pub(crate) enum ChecksumKind {
863    Http(HttpChecksum),
864    AwsS3(S3Checksum),
865}
866
867/// Defines a format that we fetch for a "one time" ingestion.
868pub trait OneshotFormat: Clone {
869    /// A single unit of work for decoding this format, e.g. a single Parquet RowGroup.
870    type WorkRequest<S>: Debug + Clone + Send + Serialize + DeserializeOwned + 'static
871    where
872        S: OneshotSource;
873    /// A chunk of records in this format that can be decoded into Rows.
874    type RecordChunk: Debug + Clone + Send + Serialize + DeserializeOwned + 'static;
875
876    /// Given an upstream object, defines how we should parse this object in parallel.
877    ///
878    /// Note: It's totally fine to not process an object in parallel, and just return a single
879    /// [`Self::WorkRequest`] here.
880    fn split_work<S: OneshotSource + Send>(
881        &self,
882        source: S,
883        object: S::Object,
884        checksum: S::Checksum,
885    ) -> impl Future<Output = Result<Vec<Self::WorkRequest<S>>, StorageErrorX>> + Send;
886
887    /// Given a work request, fetch data from the [`OneshotSource`] and return it in a format that
888    /// can later be decoded.
889    fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
890        &'a self,
891        source: &'a S,
892        request: Self::WorkRequest<S>,
893    ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>>;
894
895    /// Decode a chunk of records into [`Row`]s.
896    fn decode_chunk(
897        &self,
898        chunk: Self::RecordChunk,
899        rows: &mut Vec<Row>,
900    ) -> Result<usize, StorageErrorX>;
901}
902
903/// An enum wrapper around [`OneshotFormat`]s.
904///
905/// An alternative to this wrapper would be to use `Box<dyn OneshotFormat>`, but that requires
906/// making the trait object safe and it's easier to just wrap it in an enum. Also, this wrapper
907/// provides a convenient place to add [`StorageErrorXContext::context`] for all of our format
908/// types.
909#[derive(Clone, Debug)]
910pub(crate) enum FormatKind {
911    Csv(CsvDecoder),
912    Parquet(ParquetFormat),
913}
914
915impl OneshotFormat for FormatKind {
916    type WorkRequest<S>
917        = RequestKind<S::Object, S::Checksum>
918    where
919        S: OneshotSource;
920    type RecordChunk = RecordChunkKind;
921
922    async fn split_work<S: OneshotSource + Send>(
923        &self,
924        source: S,
925        object: S::Object,
926        checksum: S::Checksum,
927    ) -> Result<Vec<Self::WorkRequest<S>>, StorageErrorX> {
928        match self {
929            FormatKind::Csv(csv) => {
930                let work = csv
931                    .split_work(source, object, checksum)
932                    .await
933                    .context("csv")?
934                    .into_iter()
935                    .map(RequestKind::Csv)
936                    .collect();
937                Ok(work)
938            }
939            FormatKind::Parquet(parquet) => {
940                let work = parquet
941                    .split_work(source, object, checksum)
942                    .await
943                    .context("parquet")?
944                    .into_iter()
945                    .map(RequestKind::Parquet)
946                    .collect();
947                Ok(work)
948            }
949        }
950    }
951
952    fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
953        &'a self,
954        source: &'a S,
955        request: Self::WorkRequest<S>,
956    ) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>> {
957        match (self, request) {
958            (FormatKind::Csv(csv), RequestKind::Csv(request)) => csv
959                .fetch_work(source, request)
960                .map_ok(RecordChunkKind::Csv)
961                .map(|result| result.context("csv"))
962                .boxed(),
963            (FormatKind::Parquet(parquet), RequestKind::Parquet(request)) => parquet
964                .fetch_work(source, request)
965                .map_ok(RecordChunkKind::Parquet)
966                .map(|result| result.context("parquet"))
967                .boxed(),
968            (FormatKind::Parquet(_), RequestKind::Csv(_))
969            | (FormatKind::Csv(_), RequestKind::Parquet(_)) => {
970                unreachable!("programming error, {self:?}")
971            }
972        }
973    }
974
975    fn decode_chunk(
976        &self,
977        chunk: Self::RecordChunk,
978        rows: &mut Vec<Row>,
979    ) -> Result<usize, StorageErrorX> {
980        match (self, chunk) {
981            (FormatKind::Csv(csv), RecordChunkKind::Csv(chunk)) => {
982                csv.decode_chunk(chunk, rows).context("csv")
983            }
984            (FormatKind::Parquet(parquet), RecordChunkKind::Parquet(chunk)) => {
985                parquet.decode_chunk(chunk, rows).context("parquet")
986            }
987            (FormatKind::Parquet(_), RecordChunkKind::Csv(_))
988            | (FormatKind::Csv(_), RecordChunkKind::Parquet(_)) => {
989                unreachable!("programming error, {self:?}")
990            }
991        }
992    }
993}
994
995#[derive(Clone, Debug, Serialize, Deserialize)]
996pub(crate) enum RequestKind<O, C> {
997    Csv(CsvWorkRequest<O, C>),
998    Parquet(ParquetWorkRequest<O, C>),
999}
1000
1001#[derive(Clone, Debug, Serialize, Deserialize)]
1002pub(crate) enum RecordChunkKind {
1003    Csv(CsvRecord),
1004    Parquet(ParquetRowGroup),
1005}
1006
1007pub(crate) enum ObjectFilter {
1008    None,
1009    Files(BTreeSet<Box<str>>),
1010    Pattern(glob::Pattern),
1011}
1012
1013impl ObjectFilter {
1014    pub fn try_new(filter: ContentFilter) -> Result<Self, anyhow::Error> {
1015        match filter {
1016            ContentFilter::None => Ok(ObjectFilter::None),
1017            ContentFilter::Files(files) => {
1018                let files = files.into_iter().map(|f| f.into()).collect();
1019                Ok(ObjectFilter::Files(files))
1020            }
1021            ContentFilter::Pattern(pattern) => {
1022                let pattern = glob::Pattern::new(&pattern)?;
1023                Ok(ObjectFilter::Pattern(pattern))
1024            }
1025        }
1026    }
1027
1028    /// Returns if the object should be included.
1029    pub fn filter<S: OneshotSource>(&self, object: &S::Object) -> bool {
1030        match self {
1031            ObjectFilter::None => true,
1032            ObjectFilter::Files(files) => files.contains(object.path()),
1033            ObjectFilter::Pattern(pattern) => pattern.matches(object.path()),
1034        }
1035    }
1036}
1037
1038/// Experimental Error Type.
1039///
1040/// The goal of this type is to combine concepts from both `thiserror` and
1041/// `anyhow`. Having "stongly typed" errors from `thiserror` is useful for
1042/// determining what action to take and tracking the context of an error like
1043/// `anyhow` is useful for determining where an error came from.
1044#[derive(Debug, Clone, Deserialize, Serialize)]
1045pub struct StorageErrorX {
1046    kind: StorageErrorXKind,
1047    context: LinkedList<String>,
1048}
1049
1050impl fmt::Display for StorageErrorX {
1051    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1052        writeln!(f, "error: {}", self.kind)?;
1053        writeln!(f, "causes: {:?}", self.context)?;
1054        Ok(())
1055    }
1056}
1057
1058/// Experimental Error Type, see [`StorageErrorX`].
1059#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
1060pub enum StorageErrorXKind {
1061    #[error("csv decoding error: {0}")]
1062    CsvDecoding(Arc<str>),
1063    #[error("parquet error: {0}")]
1064    ParquetError(Arc<str>),
1065    #[error("reqwest error: {0}")]
1066    Reqwest(Arc<str>),
1067    #[error("aws s3 request error: {0}")]
1068    AwsS3Request(String),
1069    #[error("aws s3 bytestream error: {0}")]
1070    AwsS3Bytes(Arc<str>),
1071    #[error("invalid reqwest header: {0}")]
1072    InvalidHeader(Arc<str>),
1073    #[error("failed to decode Row from a record batch: {0}")]
1074    InvalidRecordBatch(Arc<str>),
1075    #[error("programming error: {0}")]
1076    ProgrammingError(Arc<str>),
1077    #[error("failed to get the size of an object")]
1078    MissingSize,
1079    #[error("object is missing the required '{0}' field")]
1080    MissingField(Arc<str>),
1081    #[error("failed while evaluating the provided mfp: '{0}'")]
1082    MfpEvalError(Arc<str>),
1083    #[error("no matching files found at the given url")]
1084    NoMatchingFiles,
1085    #[error("server returned HTTP {0}; Redirects are not supported, use the final URL directly.")]
1086    Redirect(u16),
1087    #[error("something went wrong: {0}")]
1088    Generic(String),
1089}
1090
1091impl From<csv_async::Error> for StorageErrorXKind {
1092    fn from(err: csv_async::Error) -> Self {
1093        StorageErrorXKind::CsvDecoding(err.to_string().into())
1094    }
1095}
1096
1097impl From<reqwest::Error> for StorageErrorXKind {
1098    fn from(err: reqwest::Error) -> Self {
1099        // Walk the source chain so that inner causes (like the custom DNS
1100        // resolver's "Address resolved to a private IP" rejection) are
1101        // visible. reqwest's top-level Display only says "error sending
1102        // request for url ..." and hides the underlying cause.
1103        StorageErrorXKind::Reqwest(err.to_string_with_causes().into())
1104    }
1105}
1106
1107impl From<reqwest::header::ToStrError> for StorageErrorXKind {
1108    fn from(err: reqwest::header::ToStrError) -> Self {
1109        StorageErrorXKind::InvalidHeader(err.to_string().into())
1110    }
1111}
1112
1113impl From<aws_smithy_types::byte_stream::error::Error> for StorageErrorXKind {
1114    fn from(err: aws_smithy_types::byte_stream::error::Error) -> Self {
1115        StorageErrorXKind::AwsS3Bytes(DisplayErrorContext(err).to_string().into())
1116    }
1117}
1118
1119impl From<::parquet::errors::ParquetError> for StorageErrorXKind {
1120    fn from(err: ::parquet::errors::ParquetError) -> Self {
1121        StorageErrorXKind::ParquetError(err.to_string().into())
1122    }
1123}
1124
1125impl StorageErrorXKind {
1126    pub fn with_context<C: Display>(self, context: C) -> StorageErrorX {
1127        StorageErrorX {
1128            kind: self,
1129            context: LinkedList::from([context.to_string()]),
1130        }
1131    }
1132
1133    pub fn invalid_record_batch<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1134        StorageErrorXKind::InvalidRecordBatch(error.into())
1135    }
1136
1137    pub fn generic<C: Display>(error: C) -> StorageErrorXKind {
1138        StorageErrorXKind::Generic(error.to_string())
1139    }
1140
1141    pub fn programming_error<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
1142        StorageErrorXKind::ProgrammingError(error.into())
1143    }
1144}
1145
1146impl<E> From<E> for StorageErrorX
1147where
1148    E: Into<StorageErrorXKind>,
1149{
1150    fn from(err: E) -> Self {
1151        StorageErrorX {
1152            kind: err.into(),
1153            context: LinkedList::new(),
1154        }
1155    }
1156}
1157
1158trait StorageErrorXContext<T> {
1159    fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1160    where
1161        C: Display;
1162}
1163
1164impl<T, E> StorageErrorXContext<T> for Result<T, E>
1165where
1166    E: Into<StorageErrorXKind>,
1167{
1168    fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1169    where
1170        C: Display,
1171    {
1172        match self {
1173            Ok(val) => Ok(val),
1174            Err(kind) => Err(StorageErrorX {
1175                kind: kind.into(),
1176                context: LinkedList::from([context.to_string()]),
1177            }),
1178        }
1179    }
1180}
1181
1182impl<T> StorageErrorXContext<T> for Result<T, StorageErrorX> {
1183    fn context<C>(self, context: C) -> Result<T, StorageErrorX>
1184    where
1185        C: Display,
1186    {
1187        match self {
1188            Ok(val) => Ok(val),
1189            Err(mut e) => {
1190                e.context.push_back(context.to_string());
1191                Err(e)
1192            }
1193        }
1194    }
1195}