Struct mz_persist_client::write::WriteHandle

source ·
pub struct WriteHandle<K: Codec, V: Codec, T, D> {
    pub(crate) cfg: PersistConfig,
    pub(crate) metrics: Arc<Metrics>,
    pub(crate) machine: Machine<K, V, T, D>,
    pub(crate) gc: GarbageCollector<K, V, T, D>,
    pub(crate) compact: Option<Compactor<K, V, T, D>>,
    pub(crate) blob: Arc<dyn Blob>,
    pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
    pub(crate) writer_id: WriterId,
    pub(crate) debug_state: HandleDebugState,
    pub(crate) write_schemas: Schemas<K, V>,
    pub(crate) upper: Antichain<T>,
    expire_fn: Option<ExpireFn>,
}
Expand description

A “capability” granting the ability to apply updates to some shard at times greater or equal to self.upper().

All async methods on ReadHandle retry for as long as they are able, but the returned std::future::Futures implement “cancel on drop” semantics. This means that callers can add a timeout using tokio::time::timeout or tokio::time::timeout_at.

tokio::time::timeout(timeout, write.fetch_recent_upper()).await

Fields§

§cfg: PersistConfig§metrics: Arc<Metrics>§machine: Machine<K, V, T, D>§gc: GarbageCollector<K, V, T, D>§compact: Option<Compactor<K, V, T, D>>§blob: Arc<dyn Blob>§isolated_runtime: Arc<IsolatedRuntime>§writer_id: WriterId§debug_state: HandleDebugState§write_schemas: Schemas<K, V>§upper: Antichain<T>§expire_fn: Option<ExpireFn>

Implementations§

source§

impl<K, V, T, D> WriteHandle<K, V, T, D>
where K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + Codec64 + Sync, D: Semigroup + Ord + Codec64 + Send + Sync,

source

pub(crate) fn new( cfg: PersistConfig, metrics: Arc<Metrics>, machine: Machine<K, V, T, D>, gc: GarbageCollector<K, V, T, D>, blob: Arc<dyn Blob>, writer_id: WriterId, purpose: &str, write_schemas: Schemas<K, V>, ) -> Self

source

pub fn from_read(read: &ReadHandle<K, V, T, D>, purpose: &str) -> Self

Creates a WriteHandle for the same shard from an existing ReadHandle.

source

pub fn shard_id(&self) -> ShardId

This handle’s shard id.

source

pub fn schema_id(&self) -> Option<SchemaId>

Returns the schema of this writer.

source

pub fn upper(&self) -> &Antichain<T>

A cached version of the shard-global upper frontier.

This is the most recent upper discovered by this handle. It is potentially more stale than Self::shared_upper but is lock-free and allocation-free. This will always be less or equal to the shard-global upper.

source

pub fn shared_upper(&self) -> Antichain<T>

A less-stale cached version of the shard-global upper frontier.

This is the most recently known upper for this shard process-wide, but unlike Self::upper it requires a mutex and a clone. This will always be less or equal to the shard-global upper.

source

pub async fn fetch_recent_upper(&mut self) -> &Antichain<T>

Fetches and returns a recent shard-global upper. Importantly, this operation is linearized with write operations.

This requires fetching the latest state from consensus and is therefore a potentially expensive operation.

source

pub async fn append<SB, KB, VB, TB, DB, I>( &mut self, updates: I, lower: Antichain<T>, upper: Antichain<T>, ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
where SB: Borrow<((KB, VB), TB, DB)>, KB: Borrow<K>, VB: Borrow<V>, TB: Borrow<T>, DB: Borrow<D>, I: IntoIterator<Item = SB>, D: Send + Sync,

Applies updates to this shard and downgrades this handle’s upper to upper.

The innermost Result is Ok if the updates were successfully written. If not, an Upper err containing the current writer upper is returned. If that happens, we also update our local upper to match the current upper. This is useful in cases where a timeout happens in between a successful write and returning that to the client.

In contrast to Self::compare_and_append, multiple WriteHandles may be used concurrently to write to the same shard, but in this case, the data being written must be identical (in the sense of “definite”-ness). It’s intended for replicated use by source ingestion, sinks, etc.

All times in updates must be greater or equal to lower and not greater or equal to upper. A upper of the empty antichain “finishes” this shard, promising that no more data is ever incoming.

updates may be empty, which allows for downgrading upper to communicate progress. It is possible to call this with upper equal to self.upper() and an empty updates (making the call a no-op).

This uses a bounded amount of memory, even when updates is very large. Individual records, however, should be small enough that we can reasonably chunk them up: O(KB) is definitely fine, O(MB) come talk to us.

The clunky multi-level Result is to enable more obvious error handling in the caller. See http://sled.rs/errors.html for details.

source

pub async fn compare_and_append<SB, KB, VB, TB, DB, I>( &mut self, updates: I, expected_upper: Antichain<T>, new_upper: Antichain<T>, ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
where SB: Borrow<((KB, VB), TB, DB)>, KB: Borrow<K>, VB: Borrow<V>, TB: Borrow<T>, DB: Borrow<D>, I: IntoIterator<Item = SB>, D: Send + Sync,

Applies updates to this shard and downgrades this handle’s upper to new_upper iff the current global upper of this shard is expected_upper.

The innermost Result is Ok if the updates were successfully written. If not, an Upper err containing the current global upper is returned.

In contrast to Self::append, this linearizes mutations from all writers. It’s intended for use as an atomic primitive for timestamp bindings, SQL tables, etc.

All times in updates must be greater or equal to expected_upper and not greater or equal to new_upper. A new_upper of the empty antichain “finishes” this shard, promising that no more data is ever incoming.

updates may be empty, which allows for downgrading upper to communicate progress. It is possible to heartbeat a writer lease by calling this with new_upper equal to self.upper() and an empty updates (making the call a no-op).

This uses a bounded amount of memory, even when updates is very large. Individual records, however, should be small enough that we can reasonably chunk them up: O(KB) is definitely fine, O(MB) come talk to us.

The clunky multi-level Result is to enable more obvious error handling in the caller. See http://sled.rs/errors.html for details.

source

pub async fn append_batch( &mut self, batch: Batch<K, V, T, D>, lower: Antichain<T>, upper: Antichain<T>, ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
where D: Send + Sync,

Appends the batch of updates to the shard and downgrades this handle’s upper to upper.

The innermost Result is Ok if the updates were successfully written. If not, an Upper err containing the current writer upper is returned. If that happens, we also update our local upper to match the current upper. This is useful in cases where a timeout happens in between a successful write and returning that to the client.

In contrast to Self::compare_and_append_batch, multiple WriteHandles may be used concurrently to write to the same shard, but in this case, the data being written must be identical (in the sense of “definite”-ness). It’s intended for replicated use by source ingestion, sinks, etc.

A upper of the empty antichain “finishes” this shard, promising that no more data is ever incoming.

The batch may be empty, which allows for downgrading upper to communicate progress. It is possible to heartbeat a writer lease by calling this with upper equal to self.upper() and an empty updates (making the call a no-op).

The clunky multi-level Result is to enable more obvious error handling in the caller. See http://sled.rs/errors.html for details.

source

pub async fn compare_and_append_batch( &mut self, batches: &mut [&mut Batch<K, V, T, D>], expected_upper: Antichain<T>, new_upper: Antichain<T>, ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
where D: Send + Sync,

Appends the batch of updates to the shard and downgrades this handle’s upper to new_upper iff the current global upper of this shard is expected_upper.

The innermost Result is Ok if the batch was successfully written. If not, an Upper err containing the current global upper is returned.

In contrast to Self::append_batch, this linearizes mutations from all writers. It’s intended for use as an atomic primitive for timestamp bindings, SQL tables, etc.

A new_upper of the empty antichain “finishes” this shard, promising that no more data is ever incoming.

The batch may be empty, which allows for downgrading upper to communicate progress. It is possible to heartbeat a writer lease by calling this with new_upper equal to self.upper() and an empty updates (making the call a no-op).

IMPORTANT: In case of an erroneous result the caller is responsible for the lifecycle of the batch. It can be deleted or it can be used to retry with adjusted frontiers.

The clunky multi-level Result is to enable more obvious error handling in the caller. See http://sled.rs/errors.html for details.

source

pub fn batch_from_transmittable_batch( &self, batch: ProtoBatch, ) -> Batch<K, V, T, D>

Turns the given ProtoBatch back into a Batch which can be used to append it to this shard.

source

pub fn builder(&self, lower: Antichain<T>) -> BatchBuilder<K, V, T, D>

Returns a BatchBuilder that can be used to write a batch of updates to blob storage which can then be appended to this shard using Self::compare_and_append_batch or Self::append_batch.

It is correct to create an empty batch, which allows for downgrading upper to communicate progress. (see Self::compare_and_append_batch or Self::append_batch)

The builder uses a bounded amount of memory, even when the number of updates is very large. Individual records, however, should be small enough that we can reasonably chunk them up: O(KB) is definitely fine, O(MB) come talk to us.

source

pub async fn batch<SB, KB, VB, TB, DB, I>( &mut self, updates: I, lower: Antichain<T>, upper: Antichain<T>, ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>>
where SB: Borrow<((KB, VB), TB, DB)>, KB: Borrow<K>, VB: Borrow<V>, TB: Borrow<T>, DB: Borrow<D>, I: IntoIterator<Item = SB>,

Uploads the given updates as one Batch to the blob store and returns a handle to the batch.

source

pub async fn wait_for_upper_past(&mut self, frontier: &Antichain<T>)

Blocks until the given frontier is less than the upper of the shard.

source

pub async fn expire(self)

Politely expires this writer, releasing any associated state.

There is a best-effort impl in Drop to expire a writer that wasn’t explictly expired with this method. When possible, explicit expiry is still preferred because the Drop one is best effort and is dependant on a tokio Handle being available in the TLC at the time of drop (which is a bit subtle). Also, explicit expiry allows for control over when it happens.

source

fn expire_fn( machine: Machine<K, V, T, D>, gc: GarbageCollector<K, V, T, D>, writer_id: WriterId, ) -> ExpireFn

Trait Implementations§

source§

impl<K: Debug + Codec, V: Debug + Codec, T: Debug, D: Debug> Debug for WriteHandle<K, V, T, D>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl<K: Codec, V: Codec, T, D> Drop for WriteHandle<K, V, T, D>

source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

§

impl<K, V, T, D> Freeze for WriteHandle<K, V, T, D>
where T: Freeze,

§

impl<K, V, T, D> !RefUnwindSafe for WriteHandle<K, V, T, D>

§

impl<K, V, T, D> Send for WriteHandle<K, V, T, D>
where T: Send + Sync,

§

impl<K, V, T, D> Sync for WriteHandle<K, V, T, D>
where T: Sync + Send,

§

impl<K, V, T, D> Unpin for WriteHandle<K, V, T, D>
where T: Unpin,

§

impl<K, V, T, D> !UnwindSafe for WriteHandle<K, V, T, D>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T, U> CastInto<U> for T
where U: CastFrom<T>,

source§

fn cast_into(self) -> U

Performs the cast.
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
source§

impl<T> Pointable for T

source§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<P, R> ProtoType<R> for P
where R: RustType<P>,

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<'a, S, T> Semigroup<&'a S> for T
where T: Semigroup<S>,

source§

fn plus_equals(&mut self, rhs: &&'a S)

The method of std::ops::AddAssign, for types that do not implement AddAssign.
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more