Struct mz_persist_client::cfg::PersistConfig

source ·
pub struct PersistConfig {
Show 25 fields pub build_version: Version, pub hostname: String, pub is_cc_active: bool, pub announce_memory_limit: Option<usize>, pub now: NowFn, pub configs: ConfigSet, configs_synced_once: Arc<Sender<bool>>, pub dynamic: Arc<DynamicConfig>, pub compaction_enabled: bool, pub compaction_concurrency_limit: usize, pub compaction_queue_size: usize, pub compaction_yield_after_n_updates: usize, pub consensus_connection_pool_max_size: usize, pub consensus_connection_pool_max_wait: Option<Duration>, pub writer_lease_duration: Duration, pub critical_downgrade_interval: Duration, pub pubsub_connect_attempt_timeout: Duration, pub pubsub_request_timeout: Duration, pub pubsub_connect_max_backoff: Duration, pub pubsub_client_sender_channel_size: usize, pub pubsub_client_receiver_channel_size: usize, pub pubsub_server_connection_channel_size: usize, pub pubsub_state_cache_shard_ref_channel_size: usize, pub pubsub_reconnect_backoff: Duration, pub isolated_runtime_worker_threads: usize,
Expand description

The tunable knobs for persist.

Tuning inputs:

  • A larger blob_target_size (capped at KEY_VAL_DATA_MAX_LEN) results in fewer entries in consensus state. Before we have compaction and/or incremental state, it is already growing without bound, so this is a concern. OTOH, for any “reasonable” size (> 100MiB?) of blob_target_size, it seems we’d end up with a pretty tremendous amount of data in the shard before this became a real issue.
  • A larger blob_target_size will results in fewer s3 operations, which are charged per operation. (Hmm, maybe not if we’re charged per call in a multipart op. The S3Blob impl already chunks things at 8MiB.)
  • A smaller blob_target_size will result in more even memory usage in readers.
  • A larger batch_builder_max_outstanding_parts increases throughput (to a point).
  • A smaller batch_builder_max_outstanding_parts provides a bound on the amount of memory used by a writer.
  • A larger compaction_heuristic_min_inputs means state size is larger.
  • A smaller compaction_heuristic_min_inputs means more compactions happen (higher write amp).
  • A larger compaction_heuristic_min_updates means more consolidations are discovered while reading a snapshot (higher read amp and higher space amp).
  • A smaller compaction_heuristic_min_updates means more compactions happen (higher write amp).

Tuning logic:

  • blob_target_size was initially selected to be an exact multiple of 8MiB (the s3 multipart size) that was in the same neighborhood as our initial max throughput (~250MiB).
  • batch_builder_max_outstanding_parts was initially selected to be as small as possible without harming pipelining. 0 means no pipelining, 1 is full pipelining as long as generating data takes less time than writing to s3 (hopefully a fair assumption), 2 is a little extra slop on top of 1.
  • compaction_heuristic_min_inputs was set by running the open-loop benchmark with batches of size 10,240 bytes (selected to be small but such that the overhead of our columnar encoding format was less than 10%) and manually increased until the write amp stopped going down. This becomes much less important once we have incremental state. The initial value is a placeholder and should be revisited at some point.
  • compaction_heuristic_min_updates was set via a thought experiment. This is an O(n*log(n)) upper bound on the number of unconsolidated updates that would be consolidated if we compacted as the in-mem Spine does. The initial value is a placeholder and should be revisited at some point.

TODO: Move these tuning notes into SessionVar descriptions once we have SystemVars for most of these.


§build_version: Version

Info about which version of the code is running.

§hostname: String

Hostname of this persist user. Stored in state and used for debugging.

§is_cc_active: bool

Whether this persist instance is running in a “cc” sized cluster.

§announce_memory_limit: Option<usize>

Memory limit of the process, if known.

§now: NowFn

A clock to use for all leasing and other non-debugging use.

§configs: ConfigSet

Persist Configs that can change value dynamically within the lifetime of a process.

TODO(cfg): Entirely replace dynamic with this.

§configs_synced_once: Arc<Sender<bool>>

Indicates whether configs has been synced at least once with an upstream source.

§dynamic: Arc<DynamicConfig>

Configurations that can be dynamically updated.

§compaction_enabled: bool

Whether to physically and logically compact batches in blob storage.

§compaction_concurrency_limit: usize

In Compactor::compact_and_apply_background, the maximum number of concurrent compaction requests that can execute for a given shard.

§compaction_queue_size: usize

In Compactor::compact_and_apply_background, the maximum number of pending compaction requests to queue.

§compaction_yield_after_n_updates: usize

In Compactor::compact_and_apply_background, how many updates to encode or decode before voluntarily yielding the task.

§consensus_connection_pool_max_size: usize

The maximum size of the connection pool to Postgres/CRDB when performing consensus reads and writes.

§consensus_connection_pool_max_wait: Option<Duration>

The maximum time to wait when attempting to obtain a connection from the pool.

§writer_lease_duration: Duration

Length of time after a writer’s last operation after which the writer may be expired.

§critical_downgrade_interval: Duration

Length of time between critical handles’ calls to downgrade since

§pubsub_connect_attempt_timeout: Duration

Timeout per connection attempt to Persist PubSub service.

§pubsub_request_timeout: Duration

Timeout per request attempt to Persist PubSub service.

§pubsub_connect_max_backoff: Duration

Maximum backoff when retrying connection establishment to Persist PubSub service.

§pubsub_client_sender_channel_size: usize

Size of channel used to buffer send messages to PubSub service.

§pubsub_client_receiver_channel_size: usize

Size of channel used to buffer received messages from PubSub service.

§pubsub_server_connection_channel_size: usize

Size of channel used per connection to buffer broadcasted messages from PubSub server.

§pubsub_state_cache_shard_ref_channel_size: usize

Size of channel used by the state cache to broadcast shard state references.

§pubsub_reconnect_backoff: Duration

Backoff after an established connection to Persist PubSub service fails.

§isolated_runtime_worker_threads: usize

Number of worker threads to create for the crate::IsolatedRuntime, defaults to the number of threads.



impl PersistConfig


pub fn new_default_configs(build_info: &BuildInfo, now: NowFn) -> Self

Returns a new instance of PersistConfig with default tuning and default ConfigSet.


pub fn new(build_info: &BuildInfo, now: NowFn, configs: ConfigSet) -> Self

Returns a new instance of PersistConfig with default tuning and the specified ConfigSet.


pub(crate) fn set_config<T: ConfigType>(&self, cfg: &Config<T>, val: T)


pub fn apply_from(&self, updates: &ConfigUpdates)

Applies the provided updates to this configuration.

You should prefer calling this method over mutating self.configs directly, so that Self::configs_synced_once can be properly maintained.


pub async fn configs_synced_once(&self)

Resolves when configs has been synced at least once with an upstream source, i.e., via Self::apply_from.

If configs has already been synced once at the time the method is called, resolves immediately.

Useful in conjunction with configuration parameters that cannot be dynamically updated once set (e.g., PubSub).


pub fn sink_minimum_batch_updates(&self) -> usize

The minimum number of updates that justify writing out a batch in persist_sink’s write_batches operator. (If there are fewer than this minimum number of updates, they’ll be forwarded on to append_batch to be combined and written there.)


pub fn storage_sink_minimum_batch_updates(&self) -> usize

The same as Self::sink_minimum_batch_updates, but for storage persist_sink’s.


pub fn storage_source_decode_fuel(&self) -> usize

The maximum amount of work to do in the persist_source mfp_and_decode operator before yielding.


pub fn optimize_ignored_data_decode(&self) -> bool

CYA to allow opt-out of a performance optimization to skip decoding ignored data.


pub fn set_reader_lease_duration(&self, val: Duration)

Overrides the value for “persist_reader_lease_duration”.


pub fn set_rollup_threshold(&self, val: usize)

Overrides the value for “persist_rollup_threshold”.


pub fn set_next_listen_batch_retryer(&self, val: RetryParameters)

Overrides the value for the “persist_next_listen_batch_retryer_*” configs.


pub fn new_for_tests() -> Self

Returns a new instance of PersistConfig for tests.


impl PersistConfig

Methods from Deref<Target = ConfigSet>§


pub fn entries(&self) -> impl Iterator<Item = &ConfigEntry>

Returns the configs currently registered to this set.

Trait Implementations§


impl BlobKnobs for PersistConfig


fn operation_timeout(&self) -> Duration

Maximum time allowed for a network call, including retry attempts.

fn operation_attempt_timeout(&self) -> Duration

Maximum time allowed for a single network call.

fn connect_timeout(&self) -> Duration

Maximum time to wait for a socket connection to be made.

fn read_timeout(&self) -> Duration

Maximum time to wait to read the first byte of a response, including connection time.

fn is_cc_active(&self) -> bool

Whether this is running in a “cc” sized cluster.

impl Clone for PersistConfig


fn clone(&self) -> PersistConfig

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

impl Debug for PersistConfig


fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

impl Deref for PersistConfig


type Target = ConfigSet

The resulting type after dereferencing.

fn deref(&self) -> &Self::Target

Dereferences the value.

impl PostgresClientKnobs for PersistConfig


fn connection_pool_max_size(&self) -> usize

Maximum number of connections allowed in a pool.

fn connection_pool_max_wait(&self) -> Option<Duration>

The maximum time to wait to obtain a connection, if any.

fn connection_pool_ttl(&self) -> Duration

Minimum TTL of a connection. It is expected that connections are routinely culled to balance load to the backing store.

fn connection_pool_ttl_stagger(&self) -> Duration

Minimum time between TTLing connections. Helps stagger reconnections to avoid stampeding the backing store.

fn connect_timeout(&self) -> Duration

Time to wait for a connection to be made before trying.

fn tcp_user_timeout(&self) -> Duration

TCP user timeout for connection attempts.

Auto Trait Implementations§

Blanket Implementations§


impl<T> Any for T
where T: 'static + ?Sized,


fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more

impl<T> Borrow<T> for T
where T: ?Sized,


fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more

impl<T> BorrowMut<T> for T
where T: ?Sized,


fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more

impl<T, U> CastInto<U> for T
where U: CastFrom<T>,


fn cast_into(self) -> U

Performs the cast.

impl<T> From<T> for T


fn from(t: T) -> T

Returns the argument unchanged.


impl<T> FromRef<T> for T
where T: Clone,


fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.

impl<T> FutureExt for T


fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more

impl<T> Instrument for T


fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more

impl<T, U> Into<U> for T
where U: From<T>,


fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.


impl<T> IntoRequest<T> for T


fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,


fn into_shared(self) -> Shared

Creates a shared type from an unshared type.

impl<T> Pointable for T


const ALIGN: usize = _

The alignment of pointer.

type Init = T

The type for initializers.

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more

impl<T> ProgressEventTimestamp for T
where T: Data + Debug + Any,


fn as_any(&self) -> &(dyn Any + 'static)

Upcasts this ProgressEventTimestamp to Any. Read more

fn type_name(&self) -> &'static str

Returns the name of the concrete type of this object. Read more

impl<P, R> ProtoType<R> for P
where R: RustType<P>,


impl<T> Same for T


type Output = T

Should always be Self

impl<'a, S, T> Semigroup<&'a S> for T
where T: Semigroup<S>,


fn plus_equals(&mut self, rhs: &&'a S)

The method of std::ops::AddAssign, for types that do not implement AddAssign.

impl<T> ToOwned for T
where T: Clone,


type Owned = T

The resulting type after obtaining ownership.

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more

impl<T, U> TryFrom<U> for T
where U: Into<T>,


type Error = Infallible

The type returned in the event of a conversion error.

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,


type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.

impl<V, T> VZip<V> for T
where V: MultiLane<T>,


fn vzip(self) -> V


impl<T> WithSubscriber for T


fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more

impl<T> Data for T
where T: Clone + 'static,