pub(super) struct ParquetUploader {
desc: Arc<RelationDesc>,
next_file_index: usize,
key_manager: S3KeyManager,
batch: u64,
max_file_size: u64,
sdk_config: Arc<SdkConfig>,
row_group_size_bytes: u64,
arrow_builder_buffer_bytes: u64,
active_file: Option<ParquetFile>,
params: CopyToParameters,
}Expand description
A ParquetUploader that writes rows to parquet files and uploads them to S3.
Spawns all S3 operations in tokio tasks to avoid blocking the surrounding timely context.
§Buffering
There are several layers of buffering in this uploader:
-
The uploader will hold a
ParquetFileobject after the first row is added. ThisParquetFileholds anArrowBuilderand anArrowWriter. -
The
ArrowBuilderbuilds a structure of in-memorymz_arrow_util::builder::ColBuilders from incomingmz_repr::Rows. Eachmz_arrow_util::builder::ColBuilderholds a specificarrow::array::buildertype for constructing a column of the given type. The entireArrowBuilderis flushed to theParquetFile’sArrowWriterby converting it into aarrow::record_batch::RecordBatchonce we’ve given it more than the configured arrow_builder_buffer_bytes. -
The
ParquetFileholds aArrowWriterthat buffers until it has enough data to write a parquet ‘row group’. The ‘row group’ size is usually based on the number of rows (in the ArrowWriter), but we also force it to flush based on data-size (see below for more details). -
When a row group is written out, the active
ParquetFileprovides a reference to the row group buffer to itsS3MultiPartUploaderwhich will copy the data to its own buffer. If this upload buffer exceeds the configured part size limit, theS3MultiPartUploaderwill upload parts to S3 until the upload buffer is below the limit. -
When the
ParquetUploaderis finished, it will flush the activeParquetFilewhich will flush itsArrowBuilderand any open row groups to theS3MultiPartUploaderand upload the remaining parts to S3.
┌───────────────┐
│ mz_repr::Rows │
└───────┬───────┘
┌─────────────│───────────────────────────────────────────┐
│ │ ParquetFile │
│ ┌───────────▼─────────────┐ │
│ │ ArrowBuilder │ │
│ │ │ ┌──────────────────┐ │
│ │ Vec<ArrowColumn> │ │ ArrowWriter │ │
│ │ ┌─────────┐ ┌─────────┐ │ │ │ │
│ │ │ │ │ │ │ │ ┌──────────┐ │ │
│ │ │ColBuildr│ │ColBuildr│ ├────┼──►│ buffer │ │ │
│ │ │ │ │ │ │ │ └─────┬────┘ │ │
│ │ └─────────┘ └─────────┘ │ │ │ │ │
│ │ │ │ ┌─────▼────┐ │ │
│ └─────────────────────────┘ │ │ row group│ │ │
│ │ └─┬────────┘ │ │
│ │ │ │ │
│ └─────┼────────────┘ │
│ ┌──────┼────────────────┐ │
│ │ │ S3MultiPart│ │
│ │ ┌────▼─────┐ Uploader │ │
│ │ │ buffer │ │ │
│ ┌─────────┐ │ └───┬─────┬┘ │ │
│ │ S3 API │◄───────────────┤ │ │ │ │
│ └─────────┘ │ ┌───▼──┐ ┌▼─────┐ │ │
│ │ │ part │ │ part │ │ │
│ │ └──────┘ └──────┘ │ │
│ │ │ │
│ └───────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘§File Size & Buffer Sizes
We expose a ‘MAX FILE SIZE’ parameter to the user, but this is difficult to enforce exactly since we don’t know the exact size of the data we’re writing before a parquet row-group is flushed. This is because the encoded size of the data is different than the in-memory representation and because the data pages within each column in a row-group are compressed. We also don’t know the exact size of the parquet metadata that will be written to the file.
Therefore we don’t use the S3MultiPartUploader’s hard file size limit since it’s difficult to handle those errors after we’ve already flushed data to the ArrowWriter. Instead we implement a crude check ourselves.
This check aims to hit the max-size limit but may exceed it by some amount. To ensure that amount is small, we set the max row-group size to a configurable ratio (e.g. 20%) of the max_file_size. This determines how often we’ll flush a row-group, but is only an approximation since the actual size of the row-group is not known until it’s written. After each row-group is flushed, the size of the file is checked and if it’s exceeded max-file-size a new file is started.
We also set the max ArrowBuilder buffer size to a ratio (e.g. 150%) of the row-group size to avoid the ArrowWriter buffering too much data itself before flushing a row-group. We’re aiming for the encoded & compressed size of the ArrowBuilder data to be roughly equal to the row-group size, but this is only an approximation.
TODO: We may want to consider adding additional limits to the buffer sizes to avoid memory issues if the user sets the max file size to be very large.
Fields§
§desc: Arc<RelationDesc>The output description.
next_file_index: usizeThe index of the next file to upload within the batch.
key_manager: S3KeyManagerProvides the appropriate bucket and object keys to use for uploads.
batch: u64Identifies the batch that files uploaded by this uploader belong to.
max_file_size: u64The desired file size. A new file upload will be started when the size exceeds this amount.
sdk_config: Arc<SdkConfig>The aws sdk config.
row_group_size_bytes: u64§arrow_builder_buffer_bytes: u64§active_file: Option<ParquetFile>The active parquet file being written to, stored in an option since it won’t be initialized until the builder is first flushed, and to make it easier to take ownership when calling in spawned tokio tasks (to avoid doing I/O in the surrounding timely context).
params: CopyToParametersUpload and buffer params
Implementations§
Source§impl ParquetUploader
impl ParquetUploader
Sourceasync fn start_new_file(&mut self) -> Result<&mut ParquetFile, Error>
async fn start_new_file(&mut self) -> Result<&mut ParquetFile, Error>
Start a new parquet file for upload. Will finish the current file if one is active.
Trait Implementations§
Source§impl CopyToS3Uploader for ParquetUploader
impl CopyToS3Uploader for ParquetUploader
fn new( sdk_config: SdkConfig, connection_details: S3UploadInfo, sink_id: &GlobalId, batch: u64, params: CopyToParameters, ) -> Result<ParquetUploader, Error>
Source§async fn append_row(&mut self, row: &Row) -> Result<(), Error>
async fn append_row(&mut self, row: &Row) -> Result<(), Error>
Auto Trait Implementations§
impl Freeze for ParquetUploader
impl !RefUnwindSafe for ParquetUploader
impl Send for ParquetUploader
impl !Sync for ParquetUploader
impl Unpin for ParquetUploader
impl !UnwindSafe for ParquetUploader
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Downcast for T
impl<T> Downcast for T
Source§impl<T> FmtForward for T
impl<T> FmtForward for T
Source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self to use its Binary implementation when Debug-formatted.Source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self to use its Display implementation when
Debug-formatted.Source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.Source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.Source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self to use its Octal implementation when Debug-formatted.Source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.Source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.Source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::RequestSource§impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
Source§impl<T> Paint for Twhere
T: ?Sized,
impl<T> Paint for Twhere
T: ?Sized,
Source§fn fg(&self, value: Color) -> Painted<&T>
fn fg(&self, value: Color) -> Painted<&T>
Returns a styled value derived from self with the foreground set to
value.
This method should be used rarely. Instead, prefer to use color-specific
builder methods like red() and
green(), which have the same functionality but are
pithier.
§Example
Set foreground color to white using fg():
use yansi::{Paint, Color};
painted.fg(Color::White);Set foreground color to white using white().
use yansi::Paint;
painted.white();Source§fn bright_black(&self) -> Painted<&T>
fn bright_black(&self) -> Painted<&T>
Source§fn bright_red(&self) -> Painted<&T>
fn bright_red(&self) -> Painted<&T>
Source§fn bright_green(&self) -> Painted<&T>
fn bright_green(&self) -> Painted<&T>
Source§fn bright_yellow(&self) -> Painted<&T>
fn bright_yellow(&self) -> Painted<&T>
Source§fn bright_blue(&self) -> Painted<&T>
fn bright_blue(&self) -> Painted<&T>
Source§fn bright_magenta(&self) -> Painted<&T>
fn bright_magenta(&self) -> Painted<&T>
Source§fn bright_cyan(&self) -> Painted<&T>
fn bright_cyan(&self) -> Painted<&T>
Source§fn bright_white(&self) -> Painted<&T>
fn bright_white(&self) -> Painted<&T>
Source§fn bg(&self, value: Color) -> Painted<&T>
fn bg(&self, value: Color) -> Painted<&T>
Returns a styled value derived from self with the background set to
value.
This method should be used rarely. Instead, prefer to use color-specific
builder methods like on_red() and
on_green(), which have the same functionality but
are pithier.
§Example
Set background color to red using fg():
use yansi::{Paint, Color};
painted.bg(Color::Red);Set background color to red using on_red().
use yansi::Paint;
painted.on_red();Source§fn on_primary(&self) -> Painted<&T>
fn on_primary(&self) -> Painted<&T>
Source§fn on_magenta(&self) -> Painted<&T>
fn on_magenta(&self) -> Painted<&T>
Source§fn on_bright_black(&self) -> Painted<&T>
fn on_bright_black(&self) -> Painted<&T>
Source§fn on_bright_red(&self) -> Painted<&T>
fn on_bright_red(&self) -> Painted<&T>
Source§fn on_bright_green(&self) -> Painted<&T>
fn on_bright_green(&self) -> Painted<&T>
Source§fn on_bright_yellow(&self) -> Painted<&T>
fn on_bright_yellow(&self) -> Painted<&T>
Source§fn on_bright_blue(&self) -> Painted<&T>
fn on_bright_blue(&self) -> Painted<&T>
Source§fn on_bright_magenta(&self) -> Painted<&T>
fn on_bright_magenta(&self) -> Painted<&T>
Source§fn on_bright_cyan(&self) -> Painted<&T>
fn on_bright_cyan(&self) -> Painted<&T>
Source§fn on_bright_white(&self) -> Painted<&T>
fn on_bright_white(&self) -> Painted<&T>
Source§fn attr(&self, value: Attribute) -> Painted<&T>
fn attr(&self, value: Attribute) -> Painted<&T>
Enables the styling Attribute value.
This method should be used rarely. Instead, prefer to use
attribute-specific builder methods like bold() and
underline(), which have the same functionality
but are pithier.
§Example
Make text bold using attr():
use yansi::{Paint, Attribute};
painted.attr(Attribute::Bold);Make text bold using using bold().
use yansi::Paint;
painted.bold();Source§fn rapid_blink(&self) -> Painted<&T>
fn rapid_blink(&self) -> Painted<&T>
Source§fn quirk(&self, value: Quirk) -> Painted<&T>
fn quirk(&self, value: Quirk) -> Painted<&T>
Enables the yansi Quirk value.
This method should be used rarely. Instead, prefer to use quirk-specific
builder methods like mask() and
wrap(), which have the same functionality but are
pithier.
§Example
Enable wrapping using .quirk():
use yansi::{Paint, Quirk};
painted.quirk(Quirk::Wrap);Enable wrapping using wrap().
use yansi::Paint;
painted.wrap();Source§fn clear(&self) -> Painted<&T>
👎Deprecated since 1.0.1: renamed to resetting() due to conflicts with Vec::clear().
The clear() method will be removed in a future release.
fn clear(&self) -> Painted<&T>
resetting() due to conflicts with Vec::clear().
The clear() method will be removed in a future release.Source§fn whenever(&self, value: Condition) -> Painted<&T>
fn whenever(&self, value: Condition) -> Painted<&T>
Conditionally enable styling based on whether the Condition value
applies. Replaces any previous condition.
See the crate level docs for more details.
§Example
Enable styling painted only when both stdout and stderr are TTYs:
use yansi::{Paint, Condition};
painted.red().on_yellow().whenever(Condition::STDOUTERR_ARE_TTY);Source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
Source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
Source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read moreSource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read moreSource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
Source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
Source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.Source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.Source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.Source§impl<T> Pointable for T
impl<T> Pointable for T
Source§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
Source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
Source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto.Source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto.Source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
Source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign, for types that do not implement AddAssign.Source§impl<T> ServiceExt for T
impl<T> ServiceExt for T
Source§fn map_response_body<F>(self, f: F) -> MapResponseBody<Self, F>where
Self: Sized,
fn map_response_body<F>(self, f: F) -> MapResponseBody<Self, F>where
Self: Sized,
Source§fn decompression(self) -> Decompression<Self>where
Self: Sized,
fn decompression(self) -> Decompression<Self>where
Self: Sized,
Source§fn trace_for_http(self) -> Trace<Self, SharedClassifier<ServerErrorsAsFailures>>where
Self: Sized,
fn trace_for_http(self) -> Trace<Self, SharedClassifier<ServerErrorsAsFailures>>where
Self: Sized,
Source§fn trace_for_grpc(self) -> Trace<Self, SharedClassifier<GrpcErrorsAsFailures>>where
Self: Sized,
fn trace_for_grpc(self) -> Trace<Self, SharedClassifier<GrpcErrorsAsFailures>>where
Self: Sized,
Source§fn follow_redirects(self) -> FollowRedirect<Self>where
Self: Sized,
fn follow_redirects(self) -> FollowRedirect<Self>where
Self: Sized,
Source§impl<T> Tap for T
impl<T> Tap for T
Source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read moreSource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read moreSource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read moreSource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read moreSource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read moreSource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read moreSource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.Source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.Source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.Source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.Source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.Source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.Source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.