use std::fmt;
use std::sync::Arc;
use bytes::Bytes;
use differential_dataflow::Hashable;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use mz_ore::cast::CastFrom;
use mz_persist_client::batch::ProtoBatch;
use mz_persist_client::cache::PersistClientCache;
use mz_persist_client::Diagnostics;
use mz_persist_types::codec_impls::UnitSchema;
use mz_repr::{Diff, GlobalId, Row, Timestamp};
use mz_storage_types::connections::ConnectionContext;
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::oneshot_sources::{
ContentFilter, ContentFormat, ContentSource, OneshotIngestionRequest,
};
use mz_storage_types::sources::SourceData;
use mz_timely_util::builder_async::{
Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
};
use mz_timely_util::pact::Distribute;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeSet, LinkedList};
use std::fmt::{Debug, Display};
use std::future::Future;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::{Scope, Stream as TimelyStream};
use timely::progress::Antichain;
use tracing::info;
use crate::oneshot_source::aws_source::{AwsS3Source, S3Checksum, S3Object};
use crate::oneshot_source::csv::{CsvDecoder, CsvRecord, CsvWorkRequest};
use crate::oneshot_source::http_source::{HttpChecksum, HttpObject, HttpOneshotSource};
use crate::oneshot_source::parquet::{ParquetFormat, ParquetRowGroup, ParquetWorkRequest};
pub mod csv;
pub mod parquet;
pub mod aws_source;
pub mod http_source;
mod util;
pub fn render<G, F>(
scope: G,
persist_clients: Arc<PersistClientCache>,
connection_context: ConnectionContext,
collection_id: GlobalId,
collection_meta: CollectionMetadata,
request: OneshotIngestionRequest,
worker_callback: F,
) -> Vec<PressOnDropButton>
where
G: Scope<Timestamp = Timestamp>,
F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
{
let OneshotIngestionRequest {
source,
format,
filter,
} = request;
let source = match source {
ContentSource::Http { url } => {
let source = HttpOneshotSource::new(reqwest::Client::default(), url);
SourceKind::Http(source)
}
ContentSource::AwsS3 {
connection,
connection_id,
uri,
} => {
let source = AwsS3Source::new(connection, connection_id, connection_context, uri);
SourceKind::AwsS3(source)
}
};
tracing::info!(?source, "created oneshot source");
let format = match format {
ContentFormat::Csv(params) => {
let format = CsvDecoder::new(params, &collection_meta.relation_desc);
FormatKind::Csv(format)
}
ContentFormat::Parquet => {
let format = ParquetFormat::new(collection_meta.relation_desc.clone());
FormatKind::Parquet(format)
}
};
let (objects_stream, discover_token) =
render_discover_objects(scope.clone(), collection_id, source.clone(), filter);
let (work_stream, split_token) = render_split_work(
scope.clone(),
collection_id,
&objects_stream,
source.clone(),
format.clone(),
);
let (records_stream, fetch_token) = render_fetch_work(
scope.clone(),
collection_id,
source.clone(),
format.clone(),
&work_stream,
);
let (rows_stream, decode_token) =
render_decode_chunk(scope.clone(), format.clone(), &records_stream);
let (batch_stream, batch_token) = render_stage_batches_operator(
scope.clone(),
collection_id,
&collection_meta,
persist_clients,
&rows_stream,
);
render_completion_operator(scope, &batch_stream, worker_callback);
let tokens = vec![
discover_token,
split_token,
fetch_token,
decode_token,
batch_token,
];
tokens
}
pub fn render_discover_objects<G, S>(
scope: G,
collection_id: GlobalId,
source: S,
filter: ContentFilter,
) -> (
TimelyStream<G, Result<(S::Object, S::Checksum), StorageErrorX>>,
PressOnDropButton,
)
where
G: Scope<Timestamp = Timestamp>,
S: OneshotSource + 'static,
{
let worker_id = scope.index();
let num_workers = scope.peers();
let active_worker_id = usize::cast_from((collection_id, "discover").hashed()) % num_workers;
let is_active_worker = worker_id == active_worker_id;
let mut builder = AsyncOperatorBuilder::new("CopyFrom-discover".to_string(), scope.clone());
let (start_handle, start_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
let shutdown = builder.build(move |caps| async move {
let [start_cap] = caps.try_into().unwrap();
if !is_active_worker {
return;
}
let filter = match ObjectFilter::try_new(filter) {
Ok(filter) => filter,
Err(err) => {
tracing::warn!(?err, "failed to create filter");
start_handle.give(&start_cap, Err(StorageErrorXKind::generic(err).into()));
return;
}
};
let work = source.list().await.context("list");
match work {
Ok(objects) => {
let names = objects.iter().map(|(o, _check)| o.name());
let found: String = itertools::intersperse(names, ", ").collect();
tracing::info!(%worker_id, %found, "listed objects");
let filtered: Vec<_> = objects
.into_iter()
.filter(|(o, _check)| filter.filter::<S>(o))
.collect();
let names = filtered.iter().map(|(o, _check)| o.name());
let returning: String = itertools::intersperse(names, ", ").collect();
tracing::info!(%worker_id, %returning, "filtered objects");
filtered
.into_iter()
.for_each(|object| start_handle.give(&start_cap, Ok(object)))
}
Err(err) => {
tracing::warn!(?err, "failed to list oneshot source");
start_handle.give(&start_cap, Err(err))
}
}
});
(start_stream, shutdown.press_on_drop())
}
pub fn render_split_work<G, S, F>(
scope: G,
collection_id: GlobalId,
objects: &TimelyStream<G, Result<(S::Object, S::Checksum), StorageErrorX>>,
source: S,
format: F,
) -> (
TimelyStream<G, Result<F::WorkRequest<S>, StorageErrorX>>,
PressOnDropButton,
)
where
G: Scope,
S: OneshotSource + Send + Sync + 'static,
F: OneshotFormat + Send + Sync + 'static,
{
let worker_id = scope.index();
let mut builder = AsyncOperatorBuilder::new("CopyFrom-split_work".to_string(), scope.clone());
let (request_handle, request_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
let mut objects_handle = builder.new_input_for(objects, Distribute, &request_handle);
let shutdown = builder.build(move |caps| async move {
let [_objects_cap] = caps.try_into().unwrap();
info!(%collection_id, %worker_id, "CopyFrom Split Work");
while let Some(event) = objects_handle.next().await {
let (capability, maybe_objects) = match event {
AsyncEvent::Data(cap, req) => (cap, req),
AsyncEvent::Progress(_) => continue,
};
let result = async {
let mut requests = Vec::new();
for maybe_object in maybe_objects {
let (object, checksum) = maybe_object?;
let format_ = format.clone();
let source_ = source.clone();
let work_requests = mz_ore::task::spawn(|| "split-work", async move {
info!(%worker_id, object = %object.name(), "splitting object");
format_.split_work(source_.clone(), object, checksum).await
})
.await
.expect("failed to spawn task")?;
requests.extend(work_requests);
}
Ok::<_, StorageErrorX>(requests)
}
.await
.context("split");
match result {
Ok(requests) => requests
.into_iter()
.for_each(|req| request_handle.give(&capability, Ok(req))),
Err(err) => request_handle.give(&capability, Err(err)),
}
}
});
(request_stream, shutdown.press_on_drop())
}
pub fn render_fetch_work<G, S, F>(
scope: G,
collection_id: GlobalId,
source: S,
format: F,
work_requests: &TimelyStream<G, Result<F::WorkRequest<S>, StorageErrorX>>,
) -> (
TimelyStream<G, Result<F::RecordChunk, StorageErrorX>>,
PressOnDropButton,
)
where
G: Scope,
S: OneshotSource + Sync + 'static,
F: OneshotFormat + Sync + 'static,
{
let worker_id = scope.index();
let mut builder = AsyncOperatorBuilder::new("CopyFrom-fetch_work".to_string(), scope.clone());
let (record_handle, record_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
let mut work_requests_handle = builder.new_input_for(work_requests, Distribute, &record_handle);
let shutdown = builder.build(move |caps| async move {
let [_work_cap] = caps.try_into().unwrap();
info!(%collection_id, %worker_id, "CopyFrom Fetch Work");
while let Some(event) = work_requests_handle.next().await {
let (capability, maybe_requests) = match event {
AsyncEvent::Data(cap, req) => (cap, req),
AsyncEvent::Progress(_) => continue,
};
let result = async {
for maybe_request in maybe_requests {
let request = maybe_request?;
let mut work_stream = format.fetch_work(&source, request);
while let Some(result) = work_stream.next().await {
let record_chunk = result.context("fetch worker")?;
record_handle.give(&capability, Ok(record_chunk));
}
}
Ok::<_, StorageErrorX>(())
}
.await
.context("fetch work");
if let Err(err) = result {
tracing::warn!(?err, "failed to fetch");
record_handle.give(&capability, Err(err))
}
}
});
(record_stream, shutdown.press_on_drop())
}
pub fn render_decode_chunk<G, F>(
scope: G,
format: F,
record_chunks: &TimelyStream<G, Result<F::RecordChunk, StorageErrorX>>,
) -> (
TimelyStream<G, Result<Row, StorageErrorX>>,
PressOnDropButton,
)
where
G: Scope,
F: OneshotFormat + 'static,
{
let mut builder = AsyncOperatorBuilder::new("CopyFrom-decode_chunk".to_string(), scope.clone());
let (row_handle, row_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
let mut record_chunk_handle = builder.new_input_for(record_chunks, Distribute, &row_handle);
let shutdown = builder.build(move |caps| async move {
let [_row_cap] = caps.try_into().unwrap();
while let Some(event) = record_chunk_handle.next().await {
let (capability, maybe_chunks) = match event {
AsyncEvent::Data(cap, data) => (cap, data),
AsyncEvent::Progress(_) => continue,
};
let result = async {
let mut rows = Vec::new();
for maybe_chunk in maybe_chunks {
let chunk = maybe_chunk?;
format.decode_chunk(chunk, &mut rows)?;
}
Ok::<_, StorageErrorX>(rows)
}
.await
.context("decode chunk");
match result {
Ok(rows) => rows
.into_iter()
.for_each(|row| row_handle.give(&capability, Ok(row))),
Err(err) => row_handle.give(&capability, Err(err)),
}
}
});
(row_stream, shutdown.press_on_drop())
}
pub fn render_stage_batches_operator<G>(
scope: G,
collection_id: GlobalId,
collection_meta: &CollectionMetadata,
persist_clients: Arc<PersistClientCache>,
rows_stream: &TimelyStream<G, Result<Row, StorageErrorX>>,
) -> (
TimelyStream<G, Result<ProtoBatch, StorageErrorX>>,
PressOnDropButton,
)
where
G: Scope,
{
let persist_location = collection_meta.persist_location.clone();
let shard_id = collection_meta.data_shard;
let collection_desc = collection_meta.relation_desc.clone();
let mut builder =
AsyncOperatorBuilder::new("CopyFrom-stage_batches".to_string(), scope.clone());
let (proto_batch_handle, proto_batch_stream) =
builder.new_output::<CapacityContainerBuilder<_>>();
let mut rows_handle = builder.new_input_for(rows_stream, Pipeline, &proto_batch_handle);
let shutdown = builder.build(move |caps| async move {
let [proto_batch_cap] = caps.try_into().unwrap();
let persist_client = persist_clients
.open(persist_location)
.await
.expect("failed to open Persist client");
let persist_diagnostics = Diagnostics {
shard_name: collection_id.to_string(),
handle_purpose: "CopyFrom::stage_batches".to_string(),
};
let write_handle = persist_client
.open_writer::<SourceData, (), mz_repr::Timestamp, Diff>(
shard_id,
Arc::new(collection_desc),
Arc::new(UnitSchema),
persist_diagnostics,
)
.await
.expect("could not open Persist shard");
let lower = mz_repr::Timestamp::MIN;
let upper = Antichain::from_elem(lower.step_forward());
let mut batch_builder = write_handle.builder(Antichain::from_elem(lower));
while let Some(event) = rows_handle.next().await {
let AsyncEvent::Data(_, row_batch) = event else {
continue;
};
for maybe_row in row_batch {
match maybe_row {
Ok(row) => {
let data = SourceData(Ok(row));
batch_builder
.add(&data, &(), &lower, &1)
.await
.expect("failed to add Row to batch");
}
Err(err) => {
let batch = batch_builder
.finish(upper)
.await
.expect("failed to cleanup batch");
batch.delete().await;
proto_batch_handle
.give(&proto_batch_cap, Err(err).context("stage batches"));
return;
}
}
}
}
let batch = batch_builder
.finish(upper)
.await
.expect("failed to create Batch");
let proto_batch = batch.into_transmittable_batch();
proto_batch_handle.give(&proto_batch_cap, Ok(proto_batch));
});
(proto_batch_stream, shutdown.press_on_drop())
}
pub fn render_completion_operator<G, F>(
scope: G,
results_stream: &TimelyStream<G, Result<ProtoBatch, StorageErrorX>>,
worker_callback: F,
) where
G: Scope,
F: FnOnce(Result<Option<ProtoBatch>, String>) -> () + 'static,
{
let mut builder = AsyncOperatorBuilder::new("CopyFrom-completion".to_string(), scope.clone());
let mut results_input = builder.new_disconnected_input(results_stream, Pipeline);
builder.build(move |_| async move {
let result = async move {
let mut maybe_payload: Option<ProtoBatch> = None;
while let Some(event) = results_input.next().await {
if let AsyncEvent::Data(_cap, results) = event {
let [result] = results
.try_into()
.expect("only 1 event on the result stream");
if maybe_payload.is_some() {
panic!("expected only one batch!");
}
maybe_payload = Some(result.map_err(|e| e.to_string())?);
}
}
Ok(maybe_payload)
}
.await;
worker_callback(result);
});
}
pub trait OneshotObject {
fn name(&self) -> &str;
fn size(&self) -> usize;
fn encodings(&self) -> &[Encoding];
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
pub enum Encoding {
Bzip2,
Gzip,
Xz,
Zstd,
}
pub trait OneshotSource: Clone + Send + Unpin {
type Object: OneshotObject
+ Debug
+ Clone
+ Send
+ Unpin
+ Serialize
+ DeserializeOwned
+ 'static;
type Checksum: Debug + Clone + Send + Unpin + Serialize + DeserializeOwned + 'static;
fn list<'a>(
&'a self,
) -> impl Future<Output = Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX>> + Send;
fn get<'s>(
&'s self,
object: Self::Object,
checksum: Self::Checksum,
range: Option<std::ops::RangeInclusive<usize>>,
) -> BoxStream<'s, Result<Bytes, StorageErrorX>>;
}
#[derive(Clone, Debug)]
pub(crate) enum SourceKind {
Http(HttpOneshotSource),
AwsS3(AwsS3Source),
}
impl OneshotSource for SourceKind {
type Object = ObjectKind;
type Checksum = ChecksumKind;
async fn list<'a>(&'a self) -> Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX> {
match self {
SourceKind::Http(http) => {
let objects = http.list().await.context("http")?;
let objects = objects
.into_iter()
.map(|(object, checksum)| {
(ObjectKind::Http(object), ChecksumKind::Http(checksum))
})
.collect();
Ok(objects)
}
SourceKind::AwsS3(s3) => {
let objects = s3.list().await.context("s3")?;
let objects = objects
.into_iter()
.map(|(object, checksum)| {
(ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum))
})
.collect();
Ok(objects)
}
}
}
fn get<'s>(
&'s self,
object: Self::Object,
checksum: Self::Checksum,
range: Option<std::ops::RangeInclusive<usize>>,
) -> BoxStream<'s, Result<Bytes, StorageErrorX>> {
match (self, object, checksum) {
(SourceKind::Http(http), ObjectKind::Http(object), ChecksumKind::Http(checksum)) => {
http.get(object, checksum, range)
.map(|result| result.context("http"))
.boxed()
}
(SourceKind::AwsS3(s3), ObjectKind::AwsS3(object), ChecksumKind::AwsS3(checksum)) => s3
.get(object, checksum, range)
.map(|result| result.context("aws_s3"))
.boxed(),
(SourceKind::AwsS3(_) | SourceKind::Http(_), _, _) => {
unreachable!("programming error! wrong source, object, and checksum kind");
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) enum ObjectKind {
Http(HttpObject),
AwsS3(S3Object),
}
impl OneshotObject for ObjectKind {
fn name(&self) -> &str {
match self {
ObjectKind::Http(object) => object.name(),
ObjectKind::AwsS3(object) => object.name(),
}
}
fn size(&self) -> usize {
match self {
ObjectKind::Http(object) => object.size(),
ObjectKind::AwsS3(object) => object.size(),
}
}
fn encodings(&self) -> &[Encoding] {
match self {
ObjectKind::Http(object) => object.encodings(),
ObjectKind::AwsS3(object) => object.encodings(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) enum ChecksumKind {
Http(HttpChecksum),
AwsS3(S3Checksum),
}
pub trait OneshotFormat: Clone {
type WorkRequest<S>: Debug + Clone + Send + Serialize + DeserializeOwned + 'static
where
S: OneshotSource;
type RecordChunk: Debug + Clone + Send + Serialize + DeserializeOwned + 'static;
fn split_work<S: OneshotSource + Send>(
&self,
source: S,
object: S::Object,
checksum: S::Checksum,
) -> impl Future<Output = Result<Vec<Self::WorkRequest<S>>, StorageErrorX>> + Send;
fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
&'a self,
source: &'a S,
request: Self::WorkRequest<S>,
) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>>;
fn decode_chunk(
&self,
chunk: Self::RecordChunk,
rows: &mut Vec<Row>,
) -> Result<usize, StorageErrorX>;
}
#[derive(Clone, Debug)]
pub(crate) enum FormatKind {
Csv(CsvDecoder),
Parquet(ParquetFormat),
}
impl OneshotFormat for FormatKind {
type WorkRequest<S>
= RequestKind<S::Object, S::Checksum>
where
S: OneshotSource;
type RecordChunk = RecordChunkKind;
async fn split_work<S: OneshotSource + Send>(
&self,
source: S,
object: S::Object,
checksum: S::Checksum,
) -> Result<Vec<Self::WorkRequest<S>>, StorageErrorX> {
match self {
FormatKind::Csv(csv) => {
let work = csv
.split_work(source, object, checksum)
.await
.context("csv")?
.into_iter()
.map(RequestKind::Csv)
.collect();
Ok(work)
}
FormatKind::Parquet(parquet) => {
let work = parquet
.split_work(source, object, checksum)
.await
.context("parquet")?
.into_iter()
.map(RequestKind::Parquet)
.collect();
Ok(work)
}
}
}
fn fetch_work<'a, S: OneshotSource + Sync + 'static>(
&'a self,
source: &'a S,
request: Self::WorkRequest<S>,
) -> BoxStream<'a, Result<Self::RecordChunk, StorageErrorX>> {
match (self, request) {
(FormatKind::Csv(csv), RequestKind::Csv(request)) => csv
.fetch_work(source, request)
.map_ok(RecordChunkKind::Csv)
.map(|result| result.context("csv"))
.boxed(),
(FormatKind::Parquet(parquet), RequestKind::Parquet(request)) => parquet
.fetch_work(source, request)
.map_ok(RecordChunkKind::Parquet)
.map(|result| result.context("parquet"))
.boxed(),
(FormatKind::Parquet(_), RequestKind::Csv(_))
| (FormatKind::Csv(_), RequestKind::Parquet(_)) => {
unreachable!("programming error, {self:?}")
}
}
}
fn decode_chunk(
&self,
chunk: Self::RecordChunk,
rows: &mut Vec<Row>,
) -> Result<usize, StorageErrorX> {
match (self, chunk) {
(FormatKind::Csv(csv), RecordChunkKind::Csv(chunk)) => {
csv.decode_chunk(chunk, rows).context("csv")
}
(FormatKind::Parquet(parquet), RecordChunkKind::Parquet(chunk)) => {
parquet.decode_chunk(chunk, rows).context("parquet")
}
(FormatKind::Parquet(_), RecordChunkKind::Csv(_))
| (FormatKind::Csv(_), RecordChunkKind::Parquet(_)) => {
unreachable!("programming error, {self:?}")
}
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) enum RequestKind<O, C> {
Csv(CsvWorkRequest<O, C>),
Parquet(ParquetWorkRequest<O, C>),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) enum RecordChunkKind {
Csv(CsvRecord),
Parquet(ParquetRowGroup),
}
pub(crate) enum ObjectFilter {
None,
Files(BTreeSet<Box<str>>),
Pattern(glob::Pattern),
}
impl ObjectFilter {
pub fn try_new(filter: ContentFilter) -> Result<Self, anyhow::Error> {
match filter {
ContentFilter::None => Ok(ObjectFilter::None),
ContentFilter::Files(files) => {
let files = files.into_iter().map(|f| f.into()).collect();
Ok(ObjectFilter::Files(files))
}
ContentFilter::Pattern(pattern) => {
let pattern = glob::Pattern::new(&pattern)?;
Ok(ObjectFilter::Pattern(pattern))
}
}
}
pub fn filter<S: OneshotSource>(&self, object: &S::Object) -> bool {
match self {
ObjectFilter::None => true,
ObjectFilter::Files(files) => files.contains(object.name()),
ObjectFilter::Pattern(pattern) => pattern.matches(object.name()),
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct StorageErrorX {
kind: StorageErrorXKind,
context: LinkedList<String>,
}
impl fmt::Display for StorageErrorX {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "error: {}", self.kind)?;
writeln!(f, "causes: {:?}", self.context)?;
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
pub enum StorageErrorXKind {
#[error("csv decoding error: {0}")]
CsvDecoding(Arc<str>),
#[error("parquet error: {0}")]
ParquetError(Arc<str>),
#[error("reqwest error: {0}")]
Reqwest(Arc<str>),
#[error("aws s3 request error: {0}")]
AwsS3Request(String),
#[error("aws s3 bytestream error: {0}")]
AwsS3Bytes(Arc<str>),
#[error("invalid reqwest header: {0}")]
InvalidHeader(Arc<str>),
#[error("failed to decode Row from a record batch: {0}")]
InvalidRecordBatch(Arc<str>),
#[error("programming error: {0}")]
ProgrammingError(Arc<str>),
#[error("failed to get the size of an object")]
MissingSize,
#[error("object is missing the required '{0}' field")]
MissingField(Arc<str>),
#[error("something went wrong: {0}")]
Generic(String),
}
impl From<csv_async::Error> for StorageErrorXKind {
fn from(err: csv_async::Error) -> Self {
StorageErrorXKind::CsvDecoding(err.to_string().into())
}
}
impl From<reqwest::Error> for StorageErrorXKind {
fn from(err: reqwest::Error) -> Self {
StorageErrorXKind::Reqwest(err.to_string().into())
}
}
impl From<reqwest::header::ToStrError> for StorageErrorXKind {
fn from(err: reqwest::header::ToStrError) -> Self {
StorageErrorXKind::InvalidHeader(err.to_string().into())
}
}
impl From<aws_smithy_types::byte_stream::error::Error> for StorageErrorXKind {
fn from(err: aws_smithy_types::byte_stream::error::Error) -> Self {
StorageErrorXKind::AwsS3Request(err.to_string())
}
}
impl From<::parquet::errors::ParquetError> for StorageErrorXKind {
fn from(err: ::parquet::errors::ParquetError) -> Self {
StorageErrorXKind::ParquetError(err.to_string().into())
}
}
impl StorageErrorXKind {
pub fn with_context<C: Display>(self, context: C) -> StorageErrorX {
StorageErrorX {
kind: self,
context: LinkedList::from([context.to_string()]),
}
}
pub fn invalid_record_batch<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
StorageErrorXKind::InvalidRecordBatch(error.into())
}
pub fn generic<C: Display>(error: C) -> StorageErrorXKind {
StorageErrorXKind::Generic(error.to_string())
}
pub fn programming_error<S: Into<Arc<str>>>(error: S) -> StorageErrorXKind {
StorageErrorXKind::ProgrammingError(error.into())
}
}
impl<E> From<E> for StorageErrorX
where
E: Into<StorageErrorXKind>,
{
fn from(err: E) -> Self {
StorageErrorX {
kind: err.into(),
context: LinkedList::new(),
}
}
}
trait StorageErrorXContext<T> {
fn context<C>(self, context: C) -> Result<T, StorageErrorX>
where
C: Display;
}
impl<T, E> StorageErrorXContext<T> for Result<T, E>
where
E: Into<StorageErrorXKind>,
{
fn context<C>(self, context: C) -> Result<T, StorageErrorX>
where
C: Display,
{
match self {
Ok(val) => Ok(val),
Err(kind) => Err(StorageErrorX {
kind: kind.into(),
context: LinkedList::from([context.to_string()]),
}),
}
}
}
impl<T> StorageErrorXContext<T> for Result<T, StorageErrorX> {
fn context<C>(self, context: C) -> Result<T, StorageErrorX>
where
C: Display,
{
match self {
Ok(val) => Ok(val),
Err(mut e) => {
e.context.push_back(context.to_string());
Err(e)
}
}
}
}