Skip to main content

mz_compute_types/
dyncfgs.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Dyncfgs used by the compute layer.
11
12use std::time::Duration;
13
14use mz_dyncfg::{Config, ConfigSet};
15
16/// Whether rendering should use `half_join2` rather than DD's `half_join` for delta joins.
17///
18/// `half_join2` avoids quadratic behavior in certain join patterns. This flag exists as an escape
19/// hatch to revert to the old implementation if issues arise.
20pub const ENABLE_HALF_JOIN2: Config<bool> = Config::new(
21    "enable_compute_half_join2",
22    true,
23    "Whether compute should use `half_join2` rather than DD's `half_join` to render delta joins.",
24);
25
26/// Whether rendering should use `mz_join_core` rather than DD's `JoinCore::join_core`.
27pub const ENABLE_MZ_JOIN_CORE: Config<bool> = Config::new(
28    "enable_mz_join_core",
29    true,
30    "Whether compute should use `mz_join_core` rather than DD's `JoinCore::join_core` to render \
31     linear joins.",
32);
33
34/// Whether rendering should use the new MV sink correction buffer implementation.
35pub const ENABLE_CORRECTION_V2: Config<bool> = Config::new(
36    "enable_compute_correction_v2",
37    true,
38    "Whether compute should use the new MV sink correction buffer implementation.",
39);
40
41/// The size factor of subsequent chains in the correction V2 buffer.
42pub const CORRECTION_V2_CHAIN_PROPORTIONALITY: Config<f64> = Config::new(
43    "compute_correction_v2_chain_proportionality",
44    3.0,
45    "The size factor of subsequent chains in the correction V2 buffer.",
46);
47
48/// The byte size of chunks in the correction V2 buffer.
49pub const CORRECTION_V2_CHUNK_SIZE: Config<usize> = Config::new(
50    "compute_correction_v2_chunk_size",
51    8 * 1024,
52    "The byte size of chunks in the correction V2 buffer.",
53);
54
55/// Whether to enable temporal bucketing in compute.
56pub const ENABLE_TEMPORAL_BUCKETING: Config<bool> = Config::new(
57    "enable_compute_temporal_bucketing",
58    false,
59    "Whether to enable temporal bucketing in compute.",
60);
61
62/// The summary to apply to the frontier in temporal bucketing in compute.
63pub const TEMPORAL_BUCKETING_SUMMARY: Config<Duration> = Config::new(
64    "compute_temporal_bucketing_summary",
65    Duration::from_secs(2),
66    "The summary to apply to frontiers in temporal bucketing in compute.",
67);
68
69/// The yielding behavior with which linear joins should be rendered.
70pub const LINEAR_JOIN_YIELDING: Config<&str> = Config::new(
71    "linear_join_yielding",
72    "work:1000000,time:100",
73    "The yielding behavior compute rendering should apply for linear join operators. Either \
74     'work:<amount>' or 'time:<milliseconds>' or 'work:<amount>,time:<milliseconds>'. Note \
75     that omitting one of 'work' or 'time' will entirely disable join yielding by time or \
76     work, respectively, rather than falling back to some default.",
77);
78
79/// Enable lgalloc.
80pub const ENABLE_LGALLOC: Config<bool> = Config::new("enable_lgalloc", true, "Enable lgalloc.");
81
82/// Enable lgalloc's eager memory return/reclamation feature.
83pub const ENABLE_LGALLOC_EAGER_RECLAMATION: Config<bool> = Config::new(
84    "enable_lgalloc_eager_reclamation",
85    true,
86    "Enable lgalloc's eager return behavior.",
87);
88
89/// The interval at which the background thread wakes.
90pub const LGALLOC_BACKGROUND_INTERVAL: Config<Duration> = Config::new(
91    "lgalloc_background_interval",
92    Duration::from_secs(1),
93    "Scheduling interval for lgalloc's background worker.",
94);
95
96/// Enable lgalloc's eager memory return/reclamation feature.
97pub const LGALLOC_FILE_GROWTH_DAMPENER: Config<usize> = Config::new(
98    "lgalloc_file_growth_dampener",
99    2,
100    "Lgalloc's file growth dampener parameter.",
101);
102
103/// Enable lgalloc's eager memory return/reclamation feature.
104pub const LGALLOC_LOCAL_BUFFER_BYTES: Config<usize> = Config::new(
105    "lgalloc_local_buffer_bytes",
106    64 << 20,
107    "Lgalloc's local buffer bytes parameter.",
108);
109
110/// The bytes to reclaim (slow path) per size class, for each background thread activation.
111pub const LGALLOC_SLOW_CLEAR_BYTES: Config<usize> = Config::new(
112    "lgalloc_slow_clear_bytes",
113    128 << 20,
114    "Clear byte size per size class for every invocation",
115);
116
117/// Interval to run the memory limiter. A zero duration disables the limiter.
118pub const MEMORY_LIMITER_INTERVAL: Config<Duration> = Config::new(
119    "memory_limiter_interval",
120    Duration::from_secs(10),
121    "Interval to run the memory limiter. A zero duration disables the limiter.",
122);
123
124/// Bias to the memory limiter usage factor.
125pub const MEMORY_LIMITER_USAGE_BIAS: Config<f64> = Config::new(
126    "memory_limiter_usage_bias",
127    1.,
128    "Multiplicative bias to the memory limiter's limit.",
129);
130
131/// Burst factor to memory limit.
132pub const MEMORY_LIMITER_BURST_FACTOR: Config<f64> = Config::new(
133    "memory_limiter_burst_factor",
134    0.,
135    "Multiplicative burst factor to the memory limiter's limit.",
136);
137
138/// Enable lgalloc for columnation.
139pub const ENABLE_COLUMNATION_LGALLOC: Config<bool> = Config::new(
140    "enable_columnation_lgalloc",
141    true,
142    "Enable allocating regions from lgalloc.",
143);
144
145/// Enable lgalloc for columnar.
146pub const ENABLE_COLUMNAR_LGALLOC: Config<bool> = Config::new(
147    "enable_columnar_lgalloc",
148    true,
149    "Enable allocating aligned regions in columnar from lgalloc.",
150);
151
152/// The interval at which the compute server performs maintenance tasks.
153pub const COMPUTE_SERVER_MAINTENANCE_INTERVAL: Config<Duration> = Config::new(
154    "compute_server_maintenance_interval",
155    Duration::from_millis(10),
156    "The interval at which the compute server performs maintenance tasks. Zero enables maintenance on every iteration.",
157);
158
159/// Maximum number of in-flight bytes emitted by persist_sources feeding dataflows.
160pub const DATAFLOW_MAX_INFLIGHT_BYTES: Config<Option<usize>> = Config::new(
161    "compute_dataflow_max_inflight_bytes",
162    None,
163    "The maximum number of in-flight bytes emitted by persist_sources feeding \
164     compute dataflows in non-cc clusters.",
165);
166
167/// The "physical backpressure" of `compute_dataflow_max_inflight_bytes_cc` has
168/// been replaced in cc replicas by persist lgalloc and we intend to remove it
169/// once everything has switched to cc. In the meantime, this is a CYA to turn
170/// it back on if absolutely necessary.
171pub const DATAFLOW_MAX_INFLIGHT_BYTES_CC: Config<Option<usize>> = Config::new(
172    "compute_dataflow_max_inflight_bytes_cc",
173    None,
174    "The maximum number of in-flight bytes emitted by persist_sources feeding \
175     compute dataflows in cc clusters.",
176);
177
178/// The term `n` in the growth rate `1 + 1/(n + 1)` for `ConsolidatingVec`.
179/// The smallest value `0` corresponds to the greatest allowed growth, of doubling.
180pub const CONSOLIDATING_VEC_GROWTH_DAMPENER: Config<usize> = Config::new(
181    "consolidating_vec_growth_dampener",
182    1,
183    "Dampener in growth rate for consolidating vector size",
184);
185
186/// The number of dataflows that may hydrate concurrently.
187pub const HYDRATION_CONCURRENCY: Config<usize> = Config::new(
188    "compute_hydration_concurrency",
189    4,
190    "Controls how many compute dataflows may hydrate concurrently.",
191);
192
193/// See `src/storage-operators/src/s3_oneshot_sink/parquet.rs` for more details.
194pub const COPY_TO_S3_PARQUET_ROW_GROUP_FILE_RATIO: Config<usize> = Config::new(
195    "copy_to_s3_parquet_row_group_file_ratio",
196    20,
197    "The ratio (defined as a percentage) of row-group size to max-file-size. \
198        Must be <= 100.",
199);
200
201/// See `src/storage-operators/src/s3_oneshot_sink/parquet.rs` for more details.
202pub const COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO: Config<usize> = Config::new(
203    "copy_to_s3_arrow_builder_buffer_ratio",
204    150,
205    "The ratio (defined as a percentage) of arrow-builder size to row-group size. \
206        Must be >= 100.",
207);
208
209/// The size of each part in the multi-part upload to use when uploading files to S3.
210pub const COPY_TO_S3_MULTIPART_PART_SIZE_BYTES: Config<usize> = Config::new(
211    "copy_to_s3_multipart_part_size_bytes",
212    1024 * 1024 * 8,
213    "The size of each part in a multipart upload to S3.",
214);
215
216/// Main switch to enable or disable replica expiration.
217///
218/// Changes affect existing replicas only after restart.
219pub const ENABLE_COMPUTE_REPLICA_EXPIRATION: Config<bool> = Config::new(
220    "enable_compute_replica_expiration",
221    true,
222    "Main switch to disable replica expiration.",
223);
224
225/// The maximum lifetime of a replica configured as an offset to the replica start time.
226/// Used in temporal filters to drop diffs generated at timestamps beyond the expiration time.
227///
228/// A zero duration implies no expiration. Changing this value does not affect existing replicas,
229/// even when they are restarted.
230pub const COMPUTE_REPLICA_EXPIRATION_OFFSET: Config<Duration> = Config::new(
231    "compute_replica_expiration_offset",
232    Duration::ZERO,
233    "The expiration time offset for replicas. Zero disables expiration.",
234);
235
236/// When enabled, applies the column demands from a MapFilterProject onto the RelationDesc used to
237/// read out of Persist. This allows Persist to prune unneeded columns as a performance
238/// optimization.
239pub const COMPUTE_APPLY_COLUMN_DEMANDS: Config<bool> = Config::new(
240    "compute_apply_column_demands",
241    true,
242    "When enabled, passes applys column demands to the RelationDesc used to read out of Persist.",
243);
244
245/// The amount of output the flat-map operator produces before yielding. Set to a high value to
246/// avoid yielding, or to a low value to yield frequently.
247pub const COMPUTE_FLAT_MAP_FUEL: Config<usize> = Config::new(
248    "compute_flat_map_fuel",
249    1_000_000,
250    "The amount of output the flat-map operator produces before yielding.",
251);
252
253/// Whether to render `as_specific_collection` using a fueled flat-map operator.
254pub const ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION: Config<bool> = Config::new(
255    "enable_compute_render_fueled_as_specific_collection",
256    true,
257    "When enabled, renders `as_specific_collection` using a fueled flat-map operator.",
258);
259
260/// Whether to apply logical backpressure in compute dataflows.
261pub const ENABLE_COMPUTE_LOGICAL_BACKPRESSURE: Config<bool> = Config::new(
262    "enable_compute_logical_backpressure",
263    false,
264    "When enabled, compute dataflows will apply logical backpressure.",
265);
266
267/// Maximal number of capabilities retained by the logical backpressure operator.
268///
269/// Selecting this value is subtle. If it's too small, it'll diminish the effectiveness of the
270/// logical backpressure operators. If it's too big, we can slow down hydration and cause state
271/// in the operator's implementation to build up.
272///
273/// The default value represents a compromise between these two extremes. We retain some metrics
274/// for 30 days, and the metrics update every minute. The default is exactly this number.
275pub const COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES: Config<Option<usize>> =
276    Config::new(
277        "compute_logical_backpressure_max_retained_capabilities",
278        Some(30 * 24 * 60),
279        "The maximum number of capabilities retained by the logical backpressure operator.",
280    );
281
282/// The slack to round observed timestamps up to.
283///
284/// The default corresponds to Mz's default tick interval, but does not need to do so. Ideally,
285/// it is not smaller than the tick interval, but it can be larger.
286pub const COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK: Config<Duration> = Config::new(
287    "compute_logical_backpressure_inflight_slack",
288    Duration::from_secs(1),
289    "Round observed timestamps to slack.",
290);
291
292/// Whether to enable the peek response stash, for sending back large peek
293/// responses. The response stash will only be used for results that exceed
294/// `compute_peek_response_stash_threshold_bytes`.
295pub const ENABLE_PEEK_RESPONSE_STASH: Config<bool> = Config::new(
296    "enable_compute_peek_response_stash",
297    true,
298    "Whether to enable the peek response stash, for sending back large peek responses. Will only be used for results that exceed compute_peek_response_stash_threshold_bytes.",
299);
300
301/// The threshold for peek response size above which we should use the peek
302/// response stash. Only used if the peek response stash is enabled _and_ if the
303/// query is "streamable" (roughly: doesn't have an ORDER BY).
304pub const PEEK_RESPONSE_STASH_THRESHOLD_BYTES: Config<usize> = Config::new(
305    "compute_peek_response_stash_threshold_bytes",
306    1024 * 10, /* 10KB */
307    "The threshold above which to use the peek response stash, for sending back large peek responses.",
308);
309
310/// The target number of maximum runs in the batches written to the stash.
311///
312/// Setting this reasonably low will make it so batches get consolidated/sorted
313/// concurrently with data being written. Which will in turn make it so that we
314/// have to do less work when reading/consolidating those batches in
315/// `environmentd`.
316pub const PEEK_RESPONSE_STASH_BATCH_MAX_RUNS: Config<usize> = Config::new(
317    "compute_peek_response_stash_batch_max_runs",
318    // The lowest possible setting, do as much work as possible on the
319    // `clusterd` side.
320    2,
321    "The target number of maximum runs in the batches written to the stash.",
322);
323
324/// The target size for batches of rows we read out of the peek stash.
325pub const PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES: Config<usize> = Config::new(
326    "compute_peek_response_stash_read_batch_size_bytes",
327    1024 * 1024 * 100, /* 100mb */
328    "The target size for batches of rows we read out of the peek stash.",
329);
330
331/// The memory budget for consolidating stashed peek responses in
332/// `environmentd`.
333pub const PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES: Config<usize> = Config::new(
334    "compute_peek_response_stash_read_memory_budget_bytes",
335    1024 * 1024 * 64, /* 64mb */
336    "The memory budget for consolidating stashed peek responses in environmentd.",
337);
338
339/// The number of batches to pump from the peek result iterator when stashing peek responses.
340pub const PEEK_STASH_NUM_BATCHES: Config<usize> = Config::new(
341    "compute_peek_stash_num_batches",
342    100,
343    "The number of batches to pump from the peek result iterator (in one iteration through the worker loop) when stashing peek responses.",
344);
345
346/// The size of each batch, as number of rows, pumped from the peek result
347/// iterator when stashing peek responses.
348pub const PEEK_STASH_BATCH_SIZE: Config<usize> = Config::new(
349    "compute_peek_stash_batch_size",
350    100000,
351    "The size, as number of rows, of each batch pumped from the peek result iterator (in one iteration through the worker loop) when stashing peek responses.",
352);
353
354/// The collection interval for the Prometheus metrics introspection source.
355///
356/// Set to zero to disable scraping and retract any existing data.
357pub const COMPUTE_PROMETHEUS_INTROSPECTION_SCRAPE_INTERVAL: Config<Duration> = Config::new(
358    "compute_prometheus_introspection_scrape_interval",
359    Duration::from_secs(1),
360    "The collection interval for the Prometheus metrics introspection source. Set to zero to disable.",
361);
362
363/// If set, skip fetching or processing the snapshot data for subscribes when possible.
364pub const SUBSCRIBE_SNAPSHOT_OPTIMIZATION: Config<bool> = Config::new(
365    "compute_subscribe_snapshot_optimization",
366    true,
367    "If set, skip fetching or processing the snapshot data for subscribes when possible.",
368);
369
370/// Temporary flag to de-risk the rollout of a release-blocker fix.
371///
372/// TODO: Remove after one, or a couple, releases.
373pub const MV_SINK_ADVANCE_PERSIST_FRONTIERS: Config<bool> = Config::new(
374    "compute_mv_sink_advance_persist_frontiers",
375    true,
376    "Whether the MV sink's write operator advances its internal persist frontiers to the as_of.",
377);
378
379/// Adds the full set of all compute `Config`s.
380pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
381    configs
382        .add(&ENABLE_HALF_JOIN2)
383        .add(&ENABLE_MZ_JOIN_CORE)
384        .add(&ENABLE_CORRECTION_V2)
385        .add(&CORRECTION_V2_CHAIN_PROPORTIONALITY)
386        .add(&CORRECTION_V2_CHUNK_SIZE)
387        .add(&ENABLE_TEMPORAL_BUCKETING)
388        .add(&TEMPORAL_BUCKETING_SUMMARY)
389        .add(&LINEAR_JOIN_YIELDING)
390        .add(&ENABLE_LGALLOC)
391        .add(&LGALLOC_BACKGROUND_INTERVAL)
392        .add(&LGALLOC_FILE_GROWTH_DAMPENER)
393        .add(&LGALLOC_LOCAL_BUFFER_BYTES)
394        .add(&LGALLOC_SLOW_CLEAR_BYTES)
395        .add(&MEMORY_LIMITER_INTERVAL)
396        .add(&MEMORY_LIMITER_USAGE_BIAS)
397        .add(&MEMORY_LIMITER_BURST_FACTOR)
398        .add(&ENABLE_LGALLOC_EAGER_RECLAMATION)
399        .add(&ENABLE_COLUMNATION_LGALLOC)
400        .add(&ENABLE_COLUMNAR_LGALLOC)
401        .add(&COMPUTE_SERVER_MAINTENANCE_INTERVAL)
402        .add(&DATAFLOW_MAX_INFLIGHT_BYTES)
403        .add(&DATAFLOW_MAX_INFLIGHT_BYTES_CC)
404        .add(&HYDRATION_CONCURRENCY)
405        .add(&COPY_TO_S3_PARQUET_ROW_GROUP_FILE_RATIO)
406        .add(&COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO)
407        .add(&COPY_TO_S3_MULTIPART_PART_SIZE_BYTES)
408        .add(&ENABLE_COMPUTE_REPLICA_EXPIRATION)
409        .add(&COMPUTE_REPLICA_EXPIRATION_OFFSET)
410        .add(&COMPUTE_APPLY_COLUMN_DEMANDS)
411        .add(&COMPUTE_FLAT_MAP_FUEL)
412        .add(&CONSOLIDATING_VEC_GROWTH_DAMPENER)
413        .add(&ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION)
414        .add(&ENABLE_COMPUTE_LOGICAL_BACKPRESSURE)
415        .add(&COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES)
416        .add(&COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK)
417        .add(&ENABLE_PEEK_RESPONSE_STASH)
418        .add(&PEEK_RESPONSE_STASH_THRESHOLD_BYTES)
419        .add(&PEEK_RESPONSE_STASH_BATCH_MAX_RUNS)
420        .add(&PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES)
421        .add(&PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES)
422        .add(&PEEK_STASH_NUM_BATCHES)
423        .add(&PEEK_STASH_BATCH_SIZE)
424        .add(&COMPUTE_PROMETHEUS_INTROSPECTION_SCRAPE_INTERVAL)
425        .add(&SUBSCRIBE_SNAPSHOT_OPTIMIZATION)
426        .add(&MV_SINK_ADVANCE_PERSIST_FRONTIERS)
427}