Struct mz_persist_client::PersistClient

source ·
pub struct PersistClient {
    pub(crate) cfg: PersistConfig,
    pub(crate) blob: Arc<dyn Blob>,
    pub(crate) consensus: Arc<dyn Consensus>,
    pub(crate) metrics: Arc<Metrics>,
    pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
    pub(crate) shared_states: Arc<StateCache>,
    pub(crate) pubsub_sender: Arc<dyn PubSubSender>,
}
Expand description

A handle for interacting with the set of persist shard made durable at a single PersistLocation.

All async methods on PersistClient retry for as long as they are able, but the returned std::future::Futures implement “cancel on drop” semantics. This means that callers can add a timeout using tokio::time::timeout or tokio::time::timeout_at.

tokio::time::timeout(timeout, client.open::<String, String, u64, i64>(id,
    Arc::new(StringSchema),Arc::new(StringSchema),diagnostics, true)).await

Fields§

§cfg: PersistConfig§blob: Arc<dyn Blob>§consensus: Arc<dyn Consensus>§metrics: Arc<Metrics>§isolated_runtime: Arc<IsolatedRuntime>§shared_states: Arc<StateCache>§pubsub_sender: Arc<dyn PubSubSender>

Implementations§

source§

impl PersistClient

source

pub fn new( cfg: PersistConfig, blob: Arc<dyn Blob>, consensus: Arc<dyn Consensus>, metrics: Arc<Metrics>, isolated_runtime: Arc<IsolatedRuntime>, shared_states: Arc<StateCache>, pubsub_sender: Arc<dyn PubSubSender>, ) -> Result<Self, ExternalError>

Returns a new client for interfacing with persist shards made durable to the given Blob and Consensus.

This is exposed mostly for testing. Persist users likely want crate::cache::PersistClientCache::open.

source

pub async fn new_for_tests() -> Self

Returns a new in-mem PersistClient for tests and examples.

source

pub fn dyncfgs(&self) -> &ConfigSet

Returns persist’s ConfigSet.

source

pub(crate) async fn make_machine<K, V, T, D>( &self, shard_id: ShardId, diagnostics: Diagnostics, ) -> Result<Machine<K, V, T, D>, InvalidUsage<T>>
where K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + Codec64 + Sync, D: Semigroup + Codec64 + Send + Sync,

source

pub async fn open<K, V, T, D>( &self, shard_id: ShardId, key_schema: Arc<K::Schema>, val_schema: Arc<V::Schema>, diagnostics: Diagnostics, use_critical_since: bool, ) -> Result<(WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>), InvalidUsage<T>>
where K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + Codec64 + Sync, D: Semigroup + Ord + Codec64 + Send + Sync,

Provides capabilities for the durable TVC identified by shard_id at its current since and upper frontiers.

This method is a best-effort attempt to regain control of the frontiers of a shard. Its most common uses are to recover capabilities that have expired (leases) or to attempt to read a TVC that one did not create (or otherwise receive capabilities for). If the frontiers have been fully released by all other parties, this call may result in capabilities with empty frontiers (which are useless).

If shard_id has never been used before, initializes a new shard and returns handles with since and upper frontiers set to initial values of Antichain::from_elem(T::minimum()).

The schema parameter is currently unused, but should be an object that represents the schema of the data in the shard. This will be required in the future.

source

pub async fn open_leased_reader<K, V, T, D>( &self, shard_id: ShardId, key_schema: Arc<K::Schema>, val_schema: Arc<V::Schema>, diagnostics: Diagnostics, use_critical_since: bool, ) -> Result<ReadHandle<K, V, T, D>, InvalidUsage<T>>
where K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + Codec64 + Sync, D: Semigroup + Codec64 + Send + Sync,

Self::open, but returning only a ReadHandle.

Use this to save latency and a bit of persist traffic if you’re just going to immediately drop or expire the WriteHandle.

The _schema parameter is currently unused, but should be an object that represents the schema of the data in the shard. This will be required in the future.

source

pub async fn create_batch_fetcher<K, V, T, D>( &self, shard_id: ShardId, key_schema: Arc<K::Schema>, val_schema: Arc<V::Schema>, is_transient: bool, diagnostics: Diagnostics, ) -> Result<BatchFetcher<K, V, T, D>, InvalidUsage<T>>
where K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + Codec64 + Sync, D: Semigroup + Codec64 + Send + Sync,

Creates and returns a BatchFetcher for the given shard id.

source

pub const CONTROLLER_CRITICAL_SINCE: CriticalReaderId = _

A convenience CriticalReaderId for Materialize controllers.

For most (soon to be all?) shards in Materialize, a centralized “controller” is the authority for when a user no longer needs to read at a given frontier. (Other uses are temporary holds where correctness of the overall system can be maintained through a lease timeout.) To make SinceHandle easier to work with, we offer this convenience id for Materialize controllers, so they don’t have to durably record it.

TODO: We’re still shaking out whether the controller should be the only critical since hold or if there are other places we want them. If the former, we should remove CriticalReaderId and bake in the singular nature of the controller critical handle.

// This prints as something that is not 0 but is visually recognizable.
assert_eq!(
    mz_persist_client::PersistClient::CONTROLLER_CRITICAL_SINCE.to_string(),
    "c00000000-1111-2222-3333-444444444444",
)
source

pub async fn open_critical_since<K, V, T, D, O>( &self, shard_id: ShardId, reader_id: CriticalReaderId, diagnostics: Diagnostics, ) -> Result<SinceHandle<K, V, T, D, O>, InvalidUsage<T>>
where K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + Codec64 + Sync, D: Semigroup + Codec64 + Send + Sync, O: Opaque + Codec64,

Provides a capability for the durable TVC identified by shard_id at its current since frontier.

In contrast to the time-leased ReadHandle returned by Self::open and Self::open_leased_reader, this handle and its associated capability are not leased. A SinceHandle does not release its since capability; downgrade to the empty antichain to hold back the since. Also unlike ReadHandle, the handle is not expired on drop. This is less ergonomic, but useful for “critical” since holds which must survive even lease timeouts.

IMPORTANT: The above means that if a SinceHandle is registered and then lost, the shard’s since will be permanently “stuck”, forever preventing logical compaction. Users are advised to durably record (preferably in code) the intended CriticalReaderId before registering a SinceHandle (in case the process crashes at the wrong time).

If shard_id has never been used before, initializes a new shard and return a handle with its since frontier set to the initial value of Antichain::from_elem(T::minimum()).

source

pub async fn open_writer<K, V, T, D>( &self, shard_id: ShardId, key_schema: Arc<K::Schema>, val_schema: Arc<V::Schema>, diagnostics: Diagnostics, ) -> Result<WriteHandle<K, V, T, D>, InvalidUsage<T>>
where K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + Codec64 + Sync, D: Semigroup + Ord + Codec64 + Send + Sync,

Self::open, but returning only a WriteHandle.

Use this to save latency and a bit of persist traffic if you’re just going to immediately drop or expire the ReadHandle.

The _schema parameter is currently unused, but should be an object that represents the schema of the data in the shard. This will be required in the future.

source

pub async fn get_schema<K, V, T, D>( &self, shard_id: ShardId, schema_id: SchemaId, diagnostics: Diagnostics, ) -> Result<Option<(K::Schema, V::Schema)>, InvalidUsage<T>>
where K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + Codec64 + Sync, D: Semigroup + Codec64 + Send + Sync,

Returns the requested schema, if known at the current state.

source

pub async fn latest_schema<K, V, T, D>( &self, shard_id: ShardId, diagnostics: Diagnostics, ) -> Result<Option<(SchemaId, K::Schema, V::Schema)>, InvalidUsage<T>>
where K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + Codec64 + Sync, D: Semigroup + Codec64 + Send + Sync,

Returns the latest schema registered at the current state.

source

pub async fn compare_and_evolve_schema<K, V, T, D>( &self, shard_id: ShardId, expected: SchemaId, key_schema: &K::Schema, val_schema: &V::Schema, diagnostics: Diagnostics, ) -> Result<CaESchema<K, V>, InvalidUsage<T>>
where K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + Codec64 + Sync, D: Semigroup + Codec64 + Send + Sync,

Registers a new latest schema for the given shard.

This new schema must be backward_compatible with all previous schemas for this shard. If it’s not, CaESchema::Incompatible is returned.

To prevent races, the caller must declare what it believes to be the latest schema id. If this doesn’t match reality, CaESchema::ExpectedMismatch is returned.

source

pub async fn is_finalized<K, V, T, D>( &self, shard_id: ShardId, diagnostics: Diagnostics, ) -> Result<bool, InvalidUsage<T>>
where K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + Codec64 + Sync, D: Semigroup + Codec64 + Send + Sync,

Check if the given shard is in a finalized state; ie. it can no longer be read, any data that was written to it is no longer accessible, and we’ve discarded references to that data from state.

source

pub async fn finalize_shard<K, V, T, D>( &self, shard_id: ShardId, diagnostics: Diagnostics, ) -> Result<(), InvalidUsage<T>>
where K: Debug + Codec, V: Debug + Codec, T: Timestamp + Lattice + Codec64 + Sync, D: Semigroup + Codec64 + Send + Sync,

If a shard is guaranteed to never be used again, finalize it to delete the associated data and release any associated resources. (Except for a little state in consensus we use to represent the tombstone.)

The caller should ensure that both the since and upper of the shard have been advanced to []: ie. the shard is no longer writable or readable. Otherwise an error is returned.

Once finalize_shard has been called, the result of future operations on the shard are not defined. They may return errors or succeed as a noop.

source

pub async fn inspect_shard<T: Timestamp + Lattice + Codec64>( &self, shard_id: &ShardId, ) -> Result<impl Serialize, Error>

Returns the internal state of the shard for debugging and QA.

We’ll be thoughtful about making unnecessary changes, but the output of this method needs to be gated from users, so that it’s not subject to our backward compatibility guarantees.

source

pub fn metrics(&self) -> &Arc<Metrics>

Return the metrics being used by this client.

Only exposed for tests, persistcli, and benchmarks.

Trait Implementations§

source§

impl Clone for PersistClient

source§

fn clone(&self) -> PersistClient

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl Debug for PersistClient

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T, U> CastInto<U> for T
where U: CastFrom<T>,

source§

fn cast_into(self) -> U

Performs the cast.
source§

impl<T> CloneToUninit for T
where T: Clone,

source§

default unsafe fn clone_to_uninit(&self, dst: *mut T)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dst. Read more
source§

impl<T> CopyAs<T> for T

source§

fn copy_as(self) -> T

source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FromRef<T> for T
where T: Clone,

source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
source§

impl<T> Pointable for T

source§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<T> ProgressEventTimestamp for T
where T: Data + Debug + Any,

source§

fn as_any(&self) -> &(dyn Any + 'static)

Upcasts this ProgressEventTimestamp to Any. Read more
source§

fn type_name(&self) -> &'static str

Returns the name of the concrete type of this object. Read more
source§

impl<P, R> ProtoType<R> for P
where R: RustType<P>,

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<'a, S, T> Semigroup<&'a S> for T
where T: Semigroup<S>,

source§

fn plus_equals(&mut self, rhs: &&'a S)

The method of std::ops::AddAssign, for types that do not implement AddAssign.
source§

impl<T> ToOwned for T
where T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

impl<T> Data for T
where T: Clone + 'static,