pub(super) struct ParquetUploader {
    desc: Arc<RelationDesc>,
    next_file_index: usize,
    key_manager: S3KeyManager,
    batch: u64,
    max_file_size: u64,
    sdk_config: Arc<SdkConfig>,
    row_group_size_bytes: u64,
    arrow_builder_buffer_bytes: u64,
    active_file: Option<ParquetFile>,
    params: CopyToParameters,
}
Expand description

A ParquetUploader that writes rows to parquet files and uploads them to S3.

Spawns all S3 operations in tokio tasks to avoid blocking the surrounding timely context.

§Buffering

There are several layers of buffering in this uploader:

      ┌───────────────┐
      │ mz_repr::Rows │
      └───────┬───────┘
┌─────────────│───────────────────────────────────────────┐
│             │         ParquetFile                       │
│ ┌───────────▼─────────────┐                             │
│ │       ArrowBuilder      │                             │
│ │                         │    ┌──────────────────┐     │
│ │     Vec<ArrowColumn>    │    │    ArrowWriter   │     │
│ │ ┌─────────┐ ┌─────────┐ │    │                  │     │
│ │ │         │ │         │ │    │   ┌──────────┐   │     │
│ │ │ColBuildr│ │ColBuildr│ ├────┼──►│  buffer  │   │     │
│ │ │         │ │         │ │    │   └─────┬────┘   │     │
│ │ └─────────┘ └─────────┘ │    │         │        │     │
│ │                         │    │   ┌─────▼────┐   │     │
│ └─────────────────────────┘    │   │ row group│   │     │
│                                │   └─┬────────┘   │     │
│                                │     │            │     │
│                                └─────┼────────────┘     │
│                               ┌──────┼────────────────┐ │
│                               │      │     S3MultiPart│ │
│                               │ ┌────▼─────┐ Uploader │ │
│                               │ │  buffer  │          │ │
│    ┌─────────┐                │ └───┬─────┬┘          │ │
│    │ S3 API  │◄───────────────┤     │     │           │ │
│    └─────────┘                │ ┌───▼──┐ ┌▼─────┐     │ │
│                               │ │ part │ │ part │     │ │
│                               │ └──────┘ └──────┘     │ │
│                               │                       │ │
│                               └───────────────────────┘ │
│                                                         │
└─────────────────────────────────────────────────────────┘

§File Size & Buffer Sizes

We expose a ‘MAX FILE SIZE’ parameter to the user, but this is difficult to enforce exactly since we don’t know the exact size of the data we’re writing before a parquet row-group is flushed. This is because the encoded size of the data is different than the in-memory representation and because the data pages within each column in a row-group are compressed. We also don’t know the exact size of the parquet metadata that will be written to the file.

Therefore we don’t use the S3MultiPartUploader’s hard file size limit since it’s difficult to handle those errors after we’ve already flushed data to the ArrowWriter. Instead we implement a crude check ourselves.

This check aims to hit the max-size limit but may exceed it by some amount. To ensure that amount is small, we set the max row-group size to a configurable ratio (e.g. 20%) of the max_file_size. This determines how often we’ll flush a row-group, but is only an approximation since the actual size of the row-group is not known until it’s written. After each row-group is flushed, the size of the file is checked and if it’s exceeded max-file-size a new file is started.

We also set the max ArrowBuilder buffer size to a ratio (e.g. 150%) of the row-group size to avoid the ArrowWriter buffering too much data itself before flushing a row-group. We’re aiming for the encoded & compressed size of the ArrowBuilder data to be roughly equal to the row-group size, but this is only an approximation.

TODO: We may want to consider adding additional limits to the buffer sizes to avoid memory issues if the user sets the max file size to be very large.

Fields§

§desc: Arc<RelationDesc>

The output description.

§next_file_index: usize

The index of the next file to upload within the batch.

§key_manager: S3KeyManager

Provides the appropriate bucket and object keys to use for uploads.

§batch: u64

Identifies the batch that files uploaded by this uploader belong to.

§max_file_size: u64

The desired file size. A new file upload will be started when the size exceeds this amount.

§sdk_config: Arc<SdkConfig>

The aws sdk config.

§row_group_size_bytes: u64§arrow_builder_buffer_bytes: u64§active_file: Option<ParquetFile>

The active parquet file being written to, stored in an option since it won’t be initialized until the builder is first flushed, and to make it easier to take ownership when calling in spawned tokio tasks (to avoid doing I/O in the surrounding timely context).

§params: CopyToParameters

Upload and buffer params

Implementations§

source§

impl ParquetUploader

source

async fn start_new_file(&mut self) -> Result<&mut ParquetFile, Error>

Start a new parquet file for upload. Will finish the current file if one is active.

Trait Implementations§

source§

impl CopyToS3Uploader for ParquetUploader

source§

fn new( sdk_config: SdkConfig, connection_details: S3UploadInfo, sink_id: &GlobalId, batch: u64, params: CopyToParameters, ) -> Result<ParquetUploader, Error>

source§

async fn append_row(&mut self, row: &Row) -> Result<(), Error>

Append a row to the internal buffer, and optionally flush the buffer to S3.
source§

async fn finish(&mut self) -> Result<(), Error>

Flush the full remaining internal buffer to S3, and close all open resources. This will be called when the input stream is finished.

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T, U> CastInto<U> for T
where U: CastFrom<T>,

source§

fn cast_into(self) -> U

Performs the cast.
source§

impl<T> Conv for T

source§

fn conv<T>(self) -> T
where Self: Into<T>,

Converts self into T using Into<T>. Read more
source§

impl<T> CopyAs<T> for T

source§

fn copy_as(self) -> T

source§

impl<T> FmtForward for T

source§

fn fmt_binary(self) -> FmtBinary<Self>
where Self: Binary,

Causes self to use its Binary implementation when Debug-formatted.
source§

fn fmt_display(self) -> FmtDisplay<Self>
where Self: Display,

Causes self to use its Display implementation when Debug-formatted.
source§

fn fmt_lower_exp(self) -> FmtLowerExp<Self>
where Self: LowerExp,

Causes self to use its LowerExp implementation when Debug-formatted.
source§

fn fmt_lower_hex(self) -> FmtLowerHex<Self>
where Self: LowerHex,

Causes self to use its LowerHex implementation when Debug-formatted.
source§

fn fmt_octal(self) -> FmtOctal<Self>
where Self: Octal,

Causes self to use its Octal implementation when Debug-formatted.
source§

fn fmt_pointer(self) -> FmtPointer<Self>
where Self: Pointer,

Causes self to use its Pointer implementation when Debug-formatted.
source§

fn fmt_upper_exp(self) -> FmtUpperExp<Self>
where Self: UpperExp,

Causes self to use its UpperExp implementation when Debug-formatted.
source§

fn fmt_upper_hex(self) -> FmtUpperHex<Self>
where Self: UpperHex,

Causes self to use its UpperHex implementation when Debug-formatted.
source§

fn fmt_list(self) -> FmtList<Self>
where &'a Self: for<'a> IntoIterator,

Formats each item in a sequence. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
source§

impl<T, U> OverrideFrom<Option<&T>> for U
where U: OverrideFrom<T>,

source§

fn override_from(self, layer: &Option<&T>) -> U

Override the configuration represented by Self with values from the given layer.
source§

impl<T> Pipe for T
where T: ?Sized,

source§

fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> R
where Self: Sized,

Pipes by value. This is generally the method you want to use. Read more
source§

fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> R
where R: 'a,

Borrows self and passes that borrow into the pipe function. Read more
source§

fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> R
where R: 'a,

Mutably borrows self and passes that borrow into the pipe function. Read more
source§

fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
where Self: Borrow<B>, B: 'a + ?Sized, R: 'a,

Borrows self, then passes self.borrow() into the pipe function. Read more
source§

fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
where Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.borrow_mut() into the pipe function. Read more
source§

fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
where Self: AsRef<U>, U: 'a + ?Sized, R: 'a,

Borrows self, then passes self.as_ref() into the pipe function.
source§

fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
where Self: AsMut<U>, U: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.as_mut() into the pipe function.
source§

fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
where Self: Deref<Target = T>, T: 'a + ?Sized, R: 'a,

Borrows self, then passes self.deref() into the pipe function.
source§

fn pipe_deref_mut<'a, T, R>( &'a mut self, func: impl FnOnce(&'a mut T) -> R, ) -> R
where Self: DerefMut<Target = T> + Deref, T: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.deref_mut() into the pipe function.
source§

impl<T> Pointable for T

source§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<P, R> ProtoType<R> for P
where R: RustType<P>,

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<'a, S, T> Semigroup<&'a S> for T
where T: Semigroup<S>,

source§

fn plus_equals(&mut self, rhs: &&'a S)

The method of std::ops::AddAssign, for types that do not implement AddAssign.
source§

impl<T> Tap for T

source§

fn tap(self, func: impl FnOnce(&Self)) -> Self

Immutable access to a value. Read more
source§

fn tap_mut(self, func: impl FnOnce(&mut Self)) -> Self

Mutable access to a value. Read more
source§

fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Immutable access to the Borrow<B> of a value. Read more
source§

fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Mutable access to the BorrowMut<B> of a value. Read more
source§

fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Immutable access to the AsRef<R> view of a value. Read more
source§

fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Mutable access to the AsMut<R> view of a value. Read more
source§

fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Immutable access to the Deref::Target of a value. Read more
source§

fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Mutable access to the Deref::Target of a value. Read more
source§

fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self

Calls .tap() only in debug builds, and is erased in release builds.
source§

fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self

Calls .tap_mut() only in debug builds, and is erased in release builds.
source§

fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Calls .tap_borrow() only in debug builds, and is erased in release builds.
source§

fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Calls .tap_borrow_mut() only in debug builds, and is erased in release builds.
source§

fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Calls .tap_ref() only in debug builds, and is erased in release builds.
source§

fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Calls .tap_ref_mut() only in debug builds, and is erased in release builds.
source§

fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Calls .tap_deref() only in debug builds, and is erased in release builds.
source§

fn tap_deref_mut_dbg<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Calls .tap_deref_mut() only in debug builds, and is erased in release builds.
source§

impl<T> TryConv for T

source§

fn try_conv<T>(self) -> Result<T, Self::Error>
where Self: TryInto<T>,

Attempts to convert self into T using TryInto<T>. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more