Skip to main content

mz_compute_types/
dyncfgs.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Dyncfgs used by the compute layer.
11
12use std::time::Duration;
13
14use mz_dyncfg::{Config, ConfigSet};
15
16/// Whether rendering should use `half_join2` rather than DD's `half_join` for delta joins.
17///
18/// `half_join2` avoids quadratic behavior in certain join patterns. This flag exists as an escape
19/// hatch to revert to the old implementation if issues arise.
20pub const ENABLE_HALF_JOIN2: Config<bool> = Config::new(
21    "enable_compute_half_join2",
22    true,
23    "Whether compute should use `half_join2` rather than DD's `half_join` to render delta joins.",
24);
25
26/// Use the column-paged merge batcher code path at arrange sites. When
27/// `true`, arrange operators use `Col2ValPagedBatcher` (in
28/// `mz_timely_util::columnar`) and `RowRowColPagedBuilder` (in
29/// `mz_row_spine`) — the columnar-native batcher that the pager can
30/// spill (gated by [`ENABLE_COLUMN_PAGED_BATCHER_SPILL`]). When `false`
31/// (the default), the same arrange sites use the legacy
32/// `Col2ValBatcher` / `RowRowBuilder` (columnation-merger) path that
33/// shipped before #36627. Read at operator construction time; flips
34/// take effect on dataflows created after the change.
35///
36/// Disabled by default while the new path is stabilizing.
37/// `DifferentialJoinHydration*` feature-benchmark scenarios opt in
38/// explicitly so the spill path is measured.
39pub const ENABLE_COLUMN_PAGED_BATCHER: Config<bool> = Config::new(
40    "enable_column_paged_batcher",
41    false,
42    "Use the columnar-native paged merge batcher at arrange sites. When `false` (default), \
43     arranges fall back to the legacy columnation `Col2ValBatcher` / `RowRowBuilder` path.",
44);
45
46/// Allow the column-paged batcher's pager to actually evict chunks
47/// under memory pressure. Only meaningful when
48/// [`ENABLE_COLUMN_PAGED_BATCHER`] is `true`; with the spill flag off
49/// the pager keeps every chunk resident regardless of budget.
50///
51/// Off by default, even when the batcher path itself is on, so the
52/// no-pressure case stays a pure resident operation. Tune the budget /
53/// backend via [`COLUMN_PAGED_BATCHER_BUDGET_FRACTION`].
54pub const ENABLE_COLUMN_PAGED_BATCHER_SPILL: Config<bool> = Config::new(
55    "enable_column_paged_batcher_spill",
56    false,
57    "Allow the column-paged batcher's pager to evict chunks under memory pressure. Only \
58     meaningful when `enable_column_paged_batcher = true`.",
59);
60
61/// Total resident-byte budget the column-paged batcher's tiered policy
62/// (`mz_timely_util::column_pager::policy::TieredPolicy`) is allowed to
63/// hold across all workers in this process, expressed as a fraction of
64/// the replica's announced memory limit. A single
65/// process-wide pool tracks all resident chunks; allocations beyond the
66/// pool spill to the configured backend.
67///
68/// `0.05` (5%) is a reasonable starting point: large enough that the
69/// per-call ColumnBuilder ship-threshold (~2 MiB) fits multiple chunks
70/// per worker, small enough that the merge-batcher's transient state
71/// doesn't crowd out the spine. Set lower to spill more aggressively
72/// under pressure. The computed budget is floored at 128 MiB so the
73/// no-pressure case doesn't page per chunk. Ignored when
74/// `enable_column_paged_batcher_spill` is `false`.
75pub const COLUMN_PAGED_BATCHER_BUDGET_FRACTION: Config<f64> = Config::new(
76    "column_paged_batcher_budget_fraction",
77    0.05,
78    "Fraction of replica memory the column-paged batcher's tiered policy may hold resident \
79     before spilling to the backend. Total budget = max(mem_limit * fraction, 128 MiB).",
80);
81
82/// Whether rendering should use `mz_join_core` rather than DD's `JoinCore::join_core`.
83pub const ENABLE_MZ_JOIN_CORE: Config<bool> = Config::new(
84    "enable_mz_join_core",
85    true,
86    "Whether compute should use `mz_join_core` rather than DD's `JoinCore::join_core` to render \
87     linear joins.",
88);
89
90/// Use sync Timely operators with Tokio tasks for the MV sink.
91pub const ENABLE_SYNC_MV_SINK: Config<bool> = Config::new(
92    "enable_compute_sync_mv_sink",
93    true,
94    "Use sync Timely operators with Tokio tasks for the MV sink.",
95);
96
97/// Whether rendering should use the new MV sink correction buffer implementation.
98pub const ENABLE_CORRECTION_V2: Config<bool> = Config::new(
99    "enable_compute_correction_v2",
100    true,
101    "Whether compute should use the new MV sink correction buffer implementation.",
102);
103
104/// The size factor of subsequent chains in the correction V2 buffer.
105pub const CORRECTION_V2_CHAIN_PROPORTIONALITY: Config<f64> = Config::new(
106    "compute_correction_v2_chain_proportionality",
107    3.0,
108    "The size factor of subsequent chains in the correction V2 buffer.",
109);
110
111/// The byte size of chunks in the correction V2 buffer.
112pub const CORRECTION_V2_CHUNK_SIZE: Config<usize> = Config::new(
113    "compute_correction_v2_chunk_size",
114    8 * 1024,
115    "The byte size of chunks in the correction V2 buffer.",
116);
117
118/// Whether to enable temporal bucketing in compute.
119pub const ENABLE_COMPUTE_TEMPORAL_BUCKETING: Config<bool> = Config::new(
120    "enable_compute_temporal_bucketing",
121    false,
122    "Whether to enable temporal bucketing in compute.",
123);
124
125/// The summary to apply to the frontier in temporal bucketing in compute.
126pub const TEMPORAL_BUCKETING_SUMMARY: Config<Duration> = Config::new(
127    "compute_temporal_bucketing_summary",
128    Duration::from_secs(2),
129    "The summary to apply to frontiers in temporal bucketing in compute.",
130);
131
132/// The yielding behavior with which linear joins should be rendered.
133pub const LINEAR_JOIN_YIELDING: Config<&str> = Config::new(
134    "linear_join_yielding",
135    "work:1000000,time:100",
136    "The yielding behavior compute rendering should apply for linear join operators. Either \
137     'work:<amount>' or 'time:<milliseconds>' or 'work:<amount>,time:<milliseconds>'. Note \
138     that omitting one of 'work' or 'time' will entirely disable join yielding by time or \
139     work, respectively, rather than falling back to some default.",
140);
141
142/// Enable lgalloc.
143pub const ENABLE_LGALLOC: Config<bool> = Config::new("enable_lgalloc", true, "Enable lgalloc.");
144
145/// Enable lgalloc's eager memory return/reclamation feature.
146pub const ENABLE_LGALLOC_EAGER_RECLAMATION: Config<bool> = Config::new(
147    "enable_lgalloc_eager_reclamation",
148    true,
149    "Enable lgalloc's eager return behavior.",
150);
151
152/// The interval at which the background thread wakes.
153pub const LGALLOC_BACKGROUND_INTERVAL: Config<Duration> = Config::new(
154    "lgalloc_background_interval",
155    Duration::from_secs(1),
156    "Scheduling interval for lgalloc's background worker.",
157);
158
159/// Enable lgalloc's eager memory return/reclamation feature.
160pub const LGALLOC_FILE_GROWTH_DAMPENER: Config<usize> = Config::new(
161    "lgalloc_file_growth_dampener",
162    2,
163    "Lgalloc's file growth dampener parameter.",
164);
165
166/// Enable lgalloc's eager memory return/reclamation feature.
167pub const LGALLOC_LOCAL_BUFFER_BYTES: Config<usize> = Config::new(
168    "lgalloc_local_buffer_bytes",
169    64 << 20,
170    "Lgalloc's local buffer bytes parameter.",
171);
172
173/// The bytes to reclaim (slow path) per size class, for each background thread activation.
174pub const LGALLOC_SLOW_CLEAR_BYTES: Config<usize> = Config::new(
175    "lgalloc_slow_clear_bytes",
176    128 << 20,
177    "Clear byte size per size class for every invocation",
178);
179
180/// Interval to run the memory limiter. A zero duration disables the limiter.
181pub const MEMORY_LIMITER_INTERVAL: Config<Duration> = Config::new(
182    "memory_limiter_interval",
183    Duration::from_secs(10),
184    "Interval to run the memory limiter. A zero duration disables the limiter.",
185);
186
187/// Bias to the memory limiter usage factor.
188pub const MEMORY_LIMITER_USAGE_BIAS: Config<f64> = Config::new(
189    "memory_limiter_usage_bias",
190    1.,
191    "Multiplicative bias to the memory limiter's limit.",
192);
193
194/// Burst factor to memory limit.
195pub const MEMORY_LIMITER_BURST_FACTOR: Config<f64> = Config::new(
196    "memory_limiter_burst_factor",
197    0.,
198    "Multiplicative burst factor to the memory limiter's limit.",
199);
200
201/// Enable lgalloc for columnation.
202pub const ENABLE_COLUMNATION_LGALLOC: Config<bool> = Config::new(
203    "enable_columnation_lgalloc",
204    true,
205    "Enable allocating regions from lgalloc.",
206);
207
208/// The interval at which the compute server performs maintenance tasks.
209pub const COMPUTE_SERVER_MAINTENANCE_INTERVAL: Config<Duration> = Config::new(
210    "compute_server_maintenance_interval",
211    Duration::from_millis(10),
212    "The interval at which the compute server performs maintenance tasks. Zero enables maintenance on every iteration.",
213);
214
215/// Maximum number of in-flight bytes emitted by persist_sources feeding dataflows.
216pub const DATAFLOW_MAX_INFLIGHT_BYTES: Config<Option<usize>> = Config::new(
217    "compute_dataflow_max_inflight_bytes",
218    None,
219    "The maximum number of in-flight bytes emitted by persist_sources feeding \
220     compute dataflows in non-cc clusters.",
221);
222
223/// The "physical backpressure" of `compute_dataflow_max_inflight_bytes_cc` has
224/// been replaced in cc replicas by persist lgalloc and we intend to remove it
225/// once everything has switched to cc. In the meantime, this is a CYA to turn
226/// it back on if absolutely necessary.
227pub const DATAFLOW_MAX_INFLIGHT_BYTES_CC: Config<Option<usize>> = Config::new(
228    "compute_dataflow_max_inflight_bytes_cc",
229    None,
230    "The maximum number of in-flight bytes emitted by persist_sources feeding \
231     compute dataflows in cc clusters.",
232);
233
234/// The term `n` in the growth rate `1 + 1/(n + 1)` for `ConsolidatingVec`.
235/// The smallest value `0` corresponds to the greatest allowed growth, of doubling.
236pub const CONSOLIDATING_VEC_GROWTH_DAMPENER: Config<usize> = Config::new(
237    "consolidating_vec_growth_dampener",
238    1,
239    "Dampener in growth rate for consolidating vector size",
240);
241
242/// The number of dataflows that may hydrate concurrently.
243pub const HYDRATION_CONCURRENCY: Config<usize> = Config::new(
244    "compute_hydration_concurrency",
245    4,
246    "Controls how many compute dataflows may hydrate concurrently.",
247);
248
249/// See `src/storage-operators/src/s3_oneshot_sink/parquet.rs` for more details.
250pub const COPY_TO_S3_PARQUET_ROW_GROUP_FILE_RATIO: Config<usize> = Config::new(
251    "copy_to_s3_parquet_row_group_file_ratio",
252    20,
253    "The ratio (defined as a percentage) of row-group size to max-file-size. \
254        Must be <= 100.",
255);
256
257/// See `src/storage-operators/src/s3_oneshot_sink/parquet.rs` for more details.
258pub const COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO: Config<usize> = Config::new(
259    "copy_to_s3_arrow_builder_buffer_ratio",
260    150,
261    "The ratio (defined as a percentage) of arrow-builder size to row-group size. \
262        Must be >= 100.",
263);
264
265/// The size of each part in the multi-part upload to use when uploading files to S3.
266pub const COPY_TO_S3_MULTIPART_PART_SIZE_BYTES: Config<usize> = Config::new(
267    "copy_to_s3_multipart_part_size_bytes",
268    1024 * 1024 * 8,
269    "The size of each part in a multipart upload to S3.",
270);
271
272/// Main switch to enable or disable replica expiration.
273///
274/// Changes affect existing replicas only after restart.
275pub const ENABLE_COMPUTE_REPLICA_EXPIRATION: Config<bool> = Config::new(
276    "enable_compute_replica_expiration",
277    true,
278    "Main switch to disable replica expiration.",
279);
280
281/// The maximum lifetime of a replica configured as an offset to the replica start time.
282/// Used in temporal filters to drop diffs generated at timestamps beyond the expiration time.
283///
284/// A zero duration implies no expiration. Changing this value does not affect existing replicas,
285/// even when they are restarted.
286pub const COMPUTE_REPLICA_EXPIRATION_OFFSET: Config<Duration> = Config::new(
287    "compute_replica_expiration_offset",
288    Duration::ZERO,
289    "The expiration time offset for replicas. Zero disables expiration.",
290);
291
292/// When enabled, applies the column demands from a MapFilterProject onto the RelationDesc used to
293/// read out of Persist. This allows Persist to prune unneeded columns as a performance
294/// optimization.
295pub const COMPUTE_APPLY_COLUMN_DEMANDS: Config<bool> = Config::new(
296    "compute_apply_column_demands",
297    true,
298    "When enabled, passes applys column demands to the RelationDesc used to read out of Persist.",
299);
300
301/// The amount of output the flat-map operator produces before yielding. Set to a high value to
302/// avoid yielding, or to a low value to yield frequently.
303pub const COMPUTE_FLAT_MAP_FUEL: Config<usize> = Config::new(
304    "compute_flat_map_fuel",
305    1_000_000,
306    "The amount of output the flat-map operator produces before yielding.",
307);
308
309/// Whether to render `as_specific_collection` using a fueled flat-map operator.
310pub const ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION: Config<bool> = Config::new(
311    "enable_compute_render_fueled_as_specific_collection",
312    true,
313    "When enabled, renders `as_specific_collection` using a fueled flat-map operator.",
314);
315
316/// Whether to apply logical backpressure in compute dataflows.
317pub const ENABLE_COMPUTE_LOGICAL_BACKPRESSURE: Config<bool> = Config::new(
318    "enable_compute_logical_backpressure",
319    false,
320    "When enabled, compute dataflows will apply logical backpressure.",
321);
322
323/// Maximal number of capabilities retained by the logical backpressure operator.
324///
325/// Selecting this value is subtle. If it's too small, it'll diminish the effectiveness of the
326/// logical backpressure operators. If it's too big, we can slow down hydration and cause state
327/// in the operator's implementation to build up.
328///
329/// The default value represents a compromise between these two extremes. We retain some metrics
330/// for 30 days, and the metrics update every minute. The default is exactly this number.
331pub const COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES: Config<Option<usize>> =
332    Config::new(
333        "compute_logical_backpressure_max_retained_capabilities",
334        Some(30 * 24 * 60),
335        "The maximum number of capabilities retained by the logical backpressure operator.",
336    );
337
338/// The slack to round observed timestamps up to.
339///
340/// The default corresponds to Mz's default tick interval, but does not need to do so. Ideally,
341/// it is not smaller than the tick interval, but it can be larger.
342pub const COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK: Config<Duration> = Config::new(
343    "compute_logical_backpressure_inflight_slack",
344    Duration::from_secs(1),
345    "Round observed timestamps to slack.",
346);
347
348/// Whether to enable the peek response stash, for sending back large peek
349/// responses. The response stash will only be used for results that exceed
350/// `compute_peek_response_stash_threshold_bytes`.
351pub const ENABLE_PEEK_RESPONSE_STASH: Config<bool> = Config::new(
352    "enable_compute_peek_response_stash",
353    true,
354    "Whether to enable the peek response stash, for sending back large peek responses. Will only be used for results that exceed compute_peek_response_stash_threshold_bytes.",
355);
356
357/// The threshold for peek response size above which we should use the peek
358/// response stash. Only used if the peek response stash is enabled _and_ if the
359/// query is "streamable" (roughly: doesn't have an ORDER BY).
360pub const PEEK_RESPONSE_STASH_THRESHOLD_BYTES: Config<usize> = Config::new(
361    "compute_peek_response_stash_threshold_bytes",
362    1024 * 10, /* 10KB */
363    "The threshold above which to use the peek response stash, for sending back large peek responses.",
364);
365
366/// The target number of maximum runs in the batches written to the stash.
367///
368/// Setting this reasonably low will make it so batches get consolidated/sorted
369/// concurrently with data being written. Which will in turn make it so that we
370/// have to do less work when reading/consolidating those batches in
371/// `environmentd`.
372pub const PEEK_RESPONSE_STASH_BATCH_MAX_RUNS: Config<usize> = Config::new(
373    "compute_peek_response_stash_batch_max_runs",
374    // The lowest possible setting, do as much work as possible on the
375    // `clusterd` side.
376    2,
377    "The target number of maximum runs in the batches written to the stash.",
378);
379
380/// The target size for batches of rows we read out of the peek stash.
381pub const PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES: Config<usize> = Config::new(
382    "compute_peek_response_stash_read_batch_size_bytes",
383    1024 * 1024 * 100, /* 100mb */
384    "The target size for batches of rows we read out of the peek stash.",
385);
386
387/// The memory budget for consolidating stashed peek responses in
388/// `environmentd`.
389pub const PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES: Config<usize> = Config::new(
390    "compute_peek_response_stash_read_memory_budget_bytes",
391    1024 * 1024 * 64, /* 64mb */
392    "The memory budget for consolidating stashed peek responses in environmentd.",
393);
394
395/// The number of batches to pump from the peek result iterator when stashing peek responses.
396pub const PEEK_STASH_NUM_BATCHES: Config<usize> = Config::new(
397    "compute_peek_stash_num_batches",
398    100,
399    "The number of batches to pump from the peek result iterator (in one iteration through the worker loop) when stashing peek responses.",
400);
401
402/// The size of each batch, as number of rows, pumped from the peek result
403/// iterator when stashing peek responses.
404pub const PEEK_STASH_BATCH_SIZE: Config<usize> = Config::new(
405    "compute_peek_stash_batch_size",
406    100000,
407    "The size, as number of rows, of each batch pumped from the peek result iterator (in one iteration through the worker loop) when stashing peek responses.",
408);
409
410/// The collection interval for the Prometheus metrics introspection source.
411///
412/// Set to zero to disable scraping and retract any existing data.
413pub const COMPUTE_PROMETHEUS_INTROSPECTION_SCRAPE_INTERVAL: Config<Duration> = Config::new(
414    "compute_prometheus_introspection_scrape_interval",
415    Duration::from_secs(1),
416    "The collection interval for the Prometheus metrics introspection source. Set to zero to disable.",
417);
418
419/// If set, skip fetching or processing the snapshot data for subscribes when possible.
420pub const SUBSCRIBE_SNAPSHOT_OPTIMIZATION: Config<bool> = Config::new(
421    "compute_subscribe_snapshot_optimization",
422    true,
423    "If set, skip fetching or processing the snapshot data for subscribes when possible.",
424);
425
426/// Temporary flag to de-risk the rollout of a release-blocker fix.
427///
428/// TODO: Remove after one, or a couple, releases.
429pub const MV_SINK_ADVANCE_PERSIST_FRONTIERS: Config<bool> = Config::new(
430    "compute_mv_sink_advance_persist_frontiers",
431    true,
432    "Whether the MV sink's write operator advances its internal persist frontiers to the as_of.",
433);
434
435/// Adds the full set of all compute `Config`s.
436pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
437    configs
438        .add(&ENABLE_HALF_JOIN2)
439        .add(&ENABLE_MZ_JOIN_CORE)
440        .add(&ENABLE_SYNC_MV_SINK)
441        .add(&ENABLE_CORRECTION_V2)
442        .add(&CORRECTION_V2_CHAIN_PROPORTIONALITY)
443        .add(&CORRECTION_V2_CHUNK_SIZE)
444        .add(&ENABLE_COMPUTE_TEMPORAL_BUCKETING)
445        .add(&TEMPORAL_BUCKETING_SUMMARY)
446        .add(&LINEAR_JOIN_YIELDING)
447        .add(&ENABLE_LGALLOC)
448        .add(&LGALLOC_BACKGROUND_INTERVAL)
449        .add(&LGALLOC_FILE_GROWTH_DAMPENER)
450        .add(&LGALLOC_LOCAL_BUFFER_BYTES)
451        .add(&LGALLOC_SLOW_CLEAR_BYTES)
452        .add(&MEMORY_LIMITER_INTERVAL)
453        .add(&MEMORY_LIMITER_USAGE_BIAS)
454        .add(&MEMORY_LIMITER_BURST_FACTOR)
455        .add(&ENABLE_LGALLOC_EAGER_RECLAMATION)
456        .add(&ENABLE_COLUMNATION_LGALLOC)
457        .add(&COMPUTE_SERVER_MAINTENANCE_INTERVAL)
458        .add(&DATAFLOW_MAX_INFLIGHT_BYTES)
459        .add(&DATAFLOW_MAX_INFLIGHT_BYTES_CC)
460        .add(&HYDRATION_CONCURRENCY)
461        .add(&COPY_TO_S3_PARQUET_ROW_GROUP_FILE_RATIO)
462        .add(&COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO)
463        .add(&COPY_TO_S3_MULTIPART_PART_SIZE_BYTES)
464        .add(&ENABLE_COMPUTE_REPLICA_EXPIRATION)
465        .add(&COMPUTE_REPLICA_EXPIRATION_OFFSET)
466        .add(&COMPUTE_APPLY_COLUMN_DEMANDS)
467        .add(&COMPUTE_FLAT_MAP_FUEL)
468        .add(&CONSOLIDATING_VEC_GROWTH_DAMPENER)
469        .add(&ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION)
470        .add(&ENABLE_COMPUTE_LOGICAL_BACKPRESSURE)
471        .add(&COMPUTE_LOGICAL_BACKPRESSURE_MAX_RETAINED_CAPABILITIES)
472        .add(&COMPUTE_LOGICAL_BACKPRESSURE_INFLIGHT_SLACK)
473        .add(&ENABLE_PEEK_RESPONSE_STASH)
474        .add(&PEEK_RESPONSE_STASH_THRESHOLD_BYTES)
475        .add(&PEEK_RESPONSE_STASH_BATCH_MAX_RUNS)
476        .add(&PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES)
477        .add(&PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES)
478        .add(&PEEK_STASH_NUM_BATCHES)
479        .add(&PEEK_STASH_BATCH_SIZE)
480        .add(&COMPUTE_PROMETHEUS_INTROSPECTION_SCRAPE_INTERVAL)
481        .add(&SUBSCRIBE_SNAPSHOT_OPTIMIZATION)
482        .add(&MV_SINK_ADVANCE_PERSIST_FRONTIERS)
483        .add(&ENABLE_COLUMN_PAGED_BATCHER)
484        .add(&ENABLE_COLUMN_PAGED_BATCHER_SPILL)
485        .add(&COLUMN_PAGED_BATCHER_BUDGET_FRACTION)
486}