Struct mz_persist_client::cfg::PersistConfig
source · pub struct PersistConfig {Show 25 fields
pub build_version: Version,
pub hostname: String,
pub is_cc_active: bool,
pub announce_memory_limit: Option<usize>,
pub now: NowFn,
pub configs: Arc<ConfigSet>,
configs_synced_once: Arc<Sender<bool>>,
pub compaction_enabled: bool,
pub compaction_process_requests: Arc<AtomicBool>,
pub compaction_concurrency_limit: usize,
pub compaction_queue_size: usize,
pub compaction_yield_after_n_updates: usize,
pub consensus_connection_pool_max_size: usize,
pub consensus_connection_pool_max_wait: Option<Duration>,
pub writer_lease_duration: Duration,
pub critical_downgrade_interval: Duration,
pub pubsub_connect_attempt_timeout: Duration,
pub pubsub_request_timeout: Duration,
pub pubsub_connect_max_backoff: Duration,
pub pubsub_client_sender_channel_size: usize,
pub pubsub_client_receiver_channel_size: usize,
pub pubsub_server_connection_channel_size: usize,
pub pubsub_state_cache_shard_ref_channel_size: usize,
pub pubsub_reconnect_backoff: Duration,
pub isolated_runtime_worker_threads: usize,
}
Expand description
The tunable knobs for persist.
Tuning inputs:
- A larger blob_target_size (capped at KEY_VAL_DATA_MAX_LEN) results in fewer entries in consensus state. Before we have compaction and/or incremental state, it is already growing without bound, so this is a concern. OTOH, for any “reasonable” size (> 100MiB?) of blob_target_size, it seems we’d end up with a pretty tremendous amount of data in the shard before this became a real issue.
- A larger blob_target_size will results in fewer s3 operations, which are charged per operation. (Hmm, maybe not if we’re charged per call in a multipart op. The S3Blob impl already chunks things at 8MiB.)
- A smaller blob_target_size will result in more even memory usage in readers.
- A larger batch_builder_max_outstanding_parts increases throughput (to a point).
- A smaller batch_builder_max_outstanding_parts provides a bound on the amount of memory used by a writer.
- A larger compaction_heuristic_min_inputs means state size is larger.
- A smaller compaction_heuristic_min_inputs means more compactions happen (higher write amp).
- A larger compaction_heuristic_min_updates means more consolidations are discovered while reading a snapshot (higher read amp and higher space amp).
- A smaller compaction_heuristic_min_updates means more compactions happen (higher write amp).
Tuning logic:
- blob_target_size was initially selected to be an exact multiple of 8MiB (the s3 multipart size) that was in the same neighborhood as our initial max throughput (~250MiB).
- batch_builder_max_outstanding_parts was initially selected to be as small as possible without harming pipelining. 0 means no pipelining, 1 is full pipelining as long as generating data takes less time than writing to s3 (hopefully a fair assumption), 2 is a little extra slop on top of 1.
- compaction_heuristic_min_inputs was set by running the open-loop benchmark with batches of size 10,240 bytes (selected to be small but such that the overhead of our columnar encoding format was less than 10%) and manually increased until the write amp stopped going down. This becomes much less important once we have incremental state. The initial value is a placeholder and should be revisited at some point.
- compaction_heuristic_min_updates was set via a thought experiment. This is
an
O(n*log(n))
upper bound on the number of unconsolidated updates that would be consolidated if we compacted as the in-mem Spine does. The initial value is a placeholder and should be revisited at some point.
TODO: Move these tuning notes into SessionVar descriptions once we have SystemVars for most of these.
Fields§
§build_version: Version
Info about which version of the code is running.
hostname: String
Hostname of this persist user. Stored in state and used for debugging.
is_cc_active: bool
Whether this persist instance is running in a “cc” sized cluster.
announce_memory_limit: Option<usize>
Memory limit of the process, if known.
now: NowFn
A clock to use for all leasing and other non-debugging use.
configs: Arc<ConfigSet>
Persist Configs that can change value dynamically within the lifetime of a process.
TODO(cfg): Entirely replace dynamic with this.
configs_synced_once: Arc<Sender<bool>>
Indicates whether configs
has been synced at least once with an
upstream source.
compaction_enabled: bool
Whether to physically and logically compact batches in blob storage.
compaction_process_requests: Arc<AtomicBool>
Whether the Compactor
will process compaction requests, or drop them on the floor.
compaction_concurrency_limit: usize
In Compactor::compact_and_apply_background, the maximum number of concurrent compaction requests that can execute for a given shard.
compaction_queue_size: usize
In Compactor::compact_and_apply_background, the maximum number of pending compaction requests to queue.
compaction_yield_after_n_updates: usize
In Compactor::compact_and_apply_background, how many updates to encode or decode before voluntarily yielding the task.
consensus_connection_pool_max_size: usize
The maximum size of the connection pool to Postgres/CRDB when performing consensus reads and writes.
consensus_connection_pool_max_wait: Option<Duration>
The maximum time to wait when attempting to obtain a connection from the pool.
writer_lease_duration: Duration
Length of time after a writer’s last operation after which the writer may be expired.
critical_downgrade_interval: Duration
Length of time between critical handles’ calls to downgrade since
pubsub_connect_attempt_timeout: Duration
Timeout per connection attempt to Persist PubSub service.
pubsub_request_timeout: Duration
Timeout per request attempt to Persist PubSub service.
pubsub_connect_max_backoff: Duration
Maximum backoff when retrying connection establishment to Persist PubSub service.
pubsub_client_sender_channel_size: usize
Size of channel used to buffer send messages to PubSub service.
pubsub_client_receiver_channel_size: usize
Size of channel used to buffer received messages from PubSub service.
pubsub_server_connection_channel_size: usize
Size of channel used per connection to buffer broadcasted messages from PubSub server.
pubsub_state_cache_shard_ref_channel_size: usize
Size of channel used by the state cache to broadcast shard state references.
pubsub_reconnect_backoff: Duration
Backoff after an established connection to Persist PubSub service fails.
isolated_runtime_worker_threads: usize
Number of worker threads to create for the crate::IsolatedRuntime
, defaults to the
number of threads.
Implementations§
source§impl PersistConfig
impl PersistConfig
sourcepub fn new_default_configs(build_info: &BuildInfo, now: NowFn) -> Self
pub fn new_default_configs(build_info: &BuildInfo, now: NowFn) -> Self
Returns a new instance of PersistConfig with default tuning and default ConfigSet.
sourcepub fn new(build_info: &BuildInfo, now: NowFn, configs: ConfigSet) -> Self
pub fn new(build_info: &BuildInfo, now: NowFn, configs: ConfigSet) -> Self
Returns a new instance of PersistConfig with default tuning and the specified ConfigSet.
pub(crate) fn set_config<T: ConfigDefault>(&self, cfg: &Config<T>, val: T)
sourcepub fn apply_from(&self, updates: &ConfigUpdates)
pub fn apply_from(&self, updates: &ConfigUpdates)
Applies the provided updates to this configuration.
You should prefer calling this method over mutating self.configs
directly, so that Self::configs_synced_once
can be properly
maintained.
sourcepub async fn configs_synced_once(&self)
pub async fn configs_synced_once(&self)
Resolves when configs
has been synced at least once with an upstream
source, i.e., via Self::apply_from
.
If configs
has already been synced once at the time the method is
called, resolves immediately.
Useful in conjunction with configuration parameters that cannot be dynamically updated once set (e.g., PubSub).
sourcepub fn storage_source_decode_fuel(&self) -> usize
pub fn storage_source_decode_fuel(&self) -> usize
The maximum amount of work to do in the persist_source mfp_and_decode operator before yielding.
sourcepub fn optimize_ignored_data_decode(&self) -> bool
pub fn optimize_ignored_data_decode(&self) -> bool
CYA to allow opt-out of a performance optimization to skip decoding ignored data.
sourcepub fn set_reader_lease_duration(&self, val: Duration)
pub fn set_reader_lease_duration(&self, val: Duration)
Overrides the value for “persist_reader_lease_duration”.
sourcepub fn set_rollup_threshold(&self, val: usize)
pub fn set_rollup_threshold(&self, val: usize)
Overrides the value for “persist_rollup_threshold”.
sourcepub fn set_next_listen_batch_retryer(&self, val: RetryParameters)
pub fn set_next_listen_batch_retryer(&self, val: RetryParameters)
Overrides the value for the “persist_next_listen_batch_retryer_*” configs.
pub fn disable_compaction(&self)
pub fn enable_compaction(&self)
sourcepub fn new_for_tests() -> Self
pub fn new_for_tests() -> Self
Returns a new instance of PersistConfig for tests.
source§impl PersistConfig
impl PersistConfig
pub(crate) const DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER: usize = 3usize
pub fn set_state_versions_recent_live_diffs_limit(&self, val: usize)
Methods from Deref<Target = ConfigSet>§
sourcepub fn entries(&self) -> impl Iterator<Item = &ConfigEntry>
pub fn entries(&self) -> impl Iterator<Item = &ConfigEntry>
Returns the configs currently registered to this set.
sourcepub fn entry(&self, name: &str) -> Option<&ConfigEntry>
pub fn entry(&self, name: &str) -> Option<&ConfigEntry>
Returns the config with name
registered to this set, if one exists.
Trait Implementations§
source§impl BlobKnobs for PersistConfig
impl BlobKnobs for PersistConfig
source§fn operation_timeout(&self) -> Duration
fn operation_timeout(&self) -> Duration
source§fn operation_attempt_timeout(&self) -> Duration
fn operation_attempt_timeout(&self) -> Duration
source§fn connect_timeout(&self) -> Duration
fn connect_timeout(&self) -> Duration
source§fn read_timeout(&self) -> Duration
fn read_timeout(&self) -> Duration
source§fn is_cc_active(&self) -> bool
fn is_cc_active(&self) -> bool
source§impl Clone for PersistConfig
impl Clone for PersistConfig
source§fn clone(&self) -> PersistConfig
fn clone(&self) -> PersistConfig
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moresource§impl Debug for PersistConfig
impl Debug for PersistConfig
source§impl Deref for PersistConfig
impl Deref for PersistConfig
source§impl PostgresClientKnobs for PersistConfig
impl PostgresClientKnobs for PersistConfig
source§fn connection_pool_max_size(&self) -> usize
fn connection_pool_max_size(&self) -> usize
source§fn connection_pool_max_wait(&self) -> Option<Duration>
fn connection_pool_max_wait(&self) -> Option<Duration>
source§fn connection_pool_ttl(&self) -> Duration
fn connection_pool_ttl(&self) -> Duration
source§fn connection_pool_ttl_stagger(&self) -> Duration
fn connection_pool_ttl_stagger(&self) -> Duration
source§fn connect_timeout(&self) -> Duration
fn connect_timeout(&self) -> Duration
source§fn tcp_user_timeout(&self) -> Duration
fn tcp_user_timeout(&self) -> Duration
Auto Trait Implementations§
impl Freeze for PersistConfig
impl !RefUnwindSafe for PersistConfig
impl Send for PersistConfig
impl Sync for PersistConfig
impl Unpin for PersistConfig
impl !UnwindSafe for PersistConfig
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
source§default unsafe fn clone_to_uninit(&self, dst: *mut T)
default unsafe fn clone_to_uninit(&self, dst: *mut T)
clone_to_uninit
)source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<T> ProgressEventTimestamp for T
impl<T> ProgressEventTimestamp for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.