mz_persist_client/internal/
compact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11use std::fmt::Debug;
12use std::marker::PhantomData;
13use std::mem;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use differential_dataflow::difference::Semigroup;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::Description;
21use futures::{Stream, pin_mut};
22use futures_util::StreamExt;
23use itertools::Either;
24use mz_dyncfg::Config;
25use mz_ore::cast::CastFrom;
26use mz_ore::error::ErrorExt;
27use mz_ore::now::NowFn;
28use mz_ore::soft_assert_or_log;
29use mz_persist::location::Blob;
30use mz_persist_types::part::Part;
31use mz_persist_types::{Codec, Codec64};
32use timely::PartialOrder;
33use timely::progress::{Antichain, Timestamp};
34use tokio::sync::mpsc::Sender;
35use tokio::sync::{TryAcquireError, mpsc, oneshot};
36use tracing::{Instrument, Span, debug, debug_span, error, trace, warn};
37
38use crate::async_runtime::IsolatedRuntime;
39use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, BatchParts, PartDeletes};
40use crate::cfg::{
41    COMPACTION_HEURISTIC_MIN_INPUTS, COMPACTION_HEURISTIC_MIN_PARTS,
42    COMPACTION_HEURISTIC_MIN_UPDATES, COMPACTION_MEMORY_BOUND_BYTES,
43    GC_BLOB_DELETE_CONCURRENCY_LIMIT, MiB,
44};
45use crate::fetch::{FetchBatchFilter, FetchConfig};
46use crate::internal::encoding::Schemas;
47use crate::internal::gc::GarbageCollector;
48use crate::internal::machine::Machine;
49use crate::internal::maintenance::RoutineMaintenance;
50use crate::internal::metrics::ShardMetrics;
51use crate::internal::state::{HollowBatch, RunMeta, RunOrder, RunPart};
52use crate::internal::trace::{
53    ActiveCompaction, ApplyMergeResult, CompactionInput, FueledMergeRes, IdHollowBatch, SpineId,
54    id_range,
55};
56use crate::iter::{Consolidator, StructuredSort};
57use crate::{Metrics, PersistConfig, ShardId};
58
59/// A request for compaction.
60///
61/// This is similar to FueledMergeReq, but intentionally a different type. If we
62/// move compaction to an rpc server, this one will become a protobuf; the type
63/// parameters will become names of codecs to look up in some registry.
64#[derive(Debug, Clone)]
65pub struct CompactReq<T> {
66    /// The shard the input and output batches belong to.
67    pub shard_id: ShardId,
68    /// A description for the output batch.
69    pub desc: Description<T>,
70    /// The updates to include in the output batch. Any data in these outside of
71    /// the output descriptions bounds should be ignored.
72    pub inputs: Vec<IdHollowBatch<T>>,
73}
74
75/// A response from compaction.
76#[derive(Debug)]
77pub struct CompactRes<T> {
78    /// The compacted batch.
79    pub output: HollowBatch<T>,
80    /// The runs that were compacted together to produce the output batch.
81    pub input: CompactionInput,
82}
83
84/// A snapshot of dynamic configs to make it easier to reason about an
85/// individual run of compaction.
86#[derive(Debug, Clone)]
87pub struct CompactConfig {
88    pub(crate) compaction_memory_bound_bytes: usize,
89    pub(crate) compaction_yield_after_n_updates: usize,
90    pub(crate) version: semver::Version,
91    pub(crate) batch: BatchBuilderConfig,
92    pub(crate) fetch_config: FetchConfig,
93    pub(crate) now: NowFn,
94}
95
96impl CompactConfig {
97    /// Initialize the compaction config from Persist configuration.
98    pub fn new(value: &PersistConfig, shard_id: ShardId) -> Self {
99        CompactConfig {
100            compaction_memory_bound_bytes: COMPACTION_MEMORY_BOUND_BYTES.get(value),
101            compaction_yield_after_n_updates: value.compaction_yield_after_n_updates,
102            version: value.build_version.clone(),
103            batch: BatchBuilderConfig::new(value, shard_id),
104            fetch_config: FetchConfig::from_persist_config(value),
105            now: value.now.clone(),
106        }
107    }
108}
109
110/// A service for performing physical and logical compaction.
111///
112/// This will possibly be called over RPC in the future. Physical compaction is
113/// merging adjacent batches. Logical compaction is advancing timestamps to a
114/// new since and consolidating the resulting updates.
115#[derive(Debug)]
116pub struct Compactor<K, V, T, D> {
117    cfg: PersistConfig,
118    metrics: Arc<Metrics>,
119    sender: Sender<(
120        Instant,
121        CompactReq<T>,
122        Machine<K, V, T, D>,
123        oneshot::Sender<Result<(), anyhow::Error>>,
124    )>,
125    _phantom: PhantomData<fn() -> D>,
126}
127
128impl<K, V, T, D> Clone for Compactor<K, V, T, D> {
129    fn clone(&self) -> Self {
130        Compactor {
131            cfg: self.cfg.clone(),
132            metrics: Arc::clone(&self.metrics),
133            sender: self.sender.clone(),
134            _phantom: Default::default(),
135        }
136    }
137}
138
139/// In Compactor::compact_and_apply_background, the minimum amount of time to
140/// allow a compaction request to run before timing it out. A request may be
141/// given a timeout greater than this value depending on the inputs' size
142pub(crate) const COMPACTION_MINIMUM_TIMEOUT: Config<Duration> = Config::new(
143    "persist_compaction_minimum_timeout",
144    Duration::from_secs(90),
145    "\
146    The minimum amount of time to allow a persist compaction request to run \
147    before timing it out (Materialize).",
148);
149
150pub(crate) const COMPACTION_USE_MOST_RECENT_SCHEMA: Config<bool> = Config::new(
151    "persist_compaction_use_most_recent_schema",
152    true,
153    "\
154    Use the most recent schema from all the Runs that are currently being \
155    compacted, instead of the schema on the current write handle (Materialize).
156    ",
157);
158
159pub(crate) const COMPACTION_CHECK_PROCESS_FLAG: Config<bool> = Config::new(
160    "persist_compaction_check_process_flag",
161    true,
162    "Whether Compactor will obey the process_requests flag in PersistConfig, \
163        which allows dynamically disabling compaction. If false, all compaction requests will be processed.",
164);
165
166/// Create a `[CompactionInput::IdRange]` from a set of `SpineId`s.
167fn input_id_range(ids: BTreeSet<SpineId>) -> CompactionInput {
168    let id = id_range(ids);
169
170    CompactionInput::IdRange(id)
171}
172
173impl<K, V, T, D> Compactor<K, V, T, D>
174where
175    K: Debug + Codec,
176    V: Debug + Codec,
177    T: Timestamp + Lattice + Codec64 + Sync,
178    D: Semigroup + Ord + Codec64 + Send + Sync,
179{
180    pub fn new(
181        cfg: PersistConfig,
182        metrics: Arc<Metrics>,
183        write_schemas: Schemas<K, V>,
184        gc: GarbageCollector<K, V, T, D>,
185    ) -> Self {
186        let (compact_req_sender, mut compact_req_receiver) = mpsc::channel::<(
187            Instant,
188            CompactReq<T>,
189            Machine<K, V, T, D>,
190            oneshot::Sender<Result<(), anyhow::Error>>,
191        )>(cfg.compaction_queue_size);
192        let concurrency_limit = Arc::new(tokio::sync::Semaphore::new(
193            cfg.compaction_concurrency_limit,
194        ));
195        let check_process_requests = COMPACTION_CHECK_PROCESS_FLAG.handle(&cfg.configs);
196        let process_requests = Arc::clone(&cfg.compaction_process_requests);
197
198        // spin off a single task responsible for executing compaction requests.
199        // work is enqueued into the task through a channel
200        let _worker_handle = mz_ore::task::spawn(|| "PersistCompactionScheduler", async move {
201            while let Some((enqueued, req, machine, completer)) = compact_req_receiver.recv().await
202            {
203                assert_eq!(req.shard_id, machine.shard_id());
204                let metrics = Arc::clone(&machine.applier.metrics);
205
206                // Only allow skipping compaction requests if the dyncfg is enabled.
207                if check_process_requests.get()
208                    && !process_requests.load(std::sync::atomic::Ordering::Relaxed)
209                {
210                    // Respond to the requester, track in our metrics, and log
211                    // that compaction is disabled.
212                    let _ = completer.send(Err(anyhow::anyhow!("compaction disabled")));
213                    metrics.compaction.disabled.inc();
214                    tracing::warn!(shard_id = ?req.shard_id, "Dropping compaction request on the floor.");
215
216                    continue;
217                }
218
219                let permit = {
220                    let inner = Arc::clone(&concurrency_limit);
221                    // perform a non-blocking attempt to acquire a permit so we can
222                    // record how often we're ever blocked on the concurrency limit
223                    match inner.try_acquire_owned() {
224                        Ok(permit) => permit,
225                        Err(TryAcquireError::NoPermits) => {
226                            metrics.compaction.concurrency_waits.inc();
227                            Arc::clone(&concurrency_limit)
228                                .acquire_owned()
229                                .await
230                                .expect("semaphore is never closed")
231                        }
232                        Err(TryAcquireError::Closed) => {
233                            // should never happen in practice. the semaphore is
234                            // never explicitly closed, nor will it close on Drop
235                            warn!("semaphore for shard {} is closed", machine.shard_id());
236                            continue;
237                        }
238                    }
239                };
240                metrics
241                    .compaction
242                    .queued_seconds
243                    .inc_by(enqueued.elapsed().as_secs_f64());
244
245                let write_schemas = write_schemas.clone();
246
247                let compact_span =
248                    debug_span!(parent: None, "compact::apply", shard_id=%machine.shard_id());
249                compact_span.follows_from(&Span::current());
250                let gc = gc.clone();
251                mz_ore::task::spawn(|| "PersistCompactionWorker", async move {
252                    let res = Self::compact_and_apply(&machine, req, write_schemas)
253                        .instrument(compact_span)
254                        .await;
255                    if let Ok(maintenance) = res {
256                        maintenance.start_performing(&machine, &gc);
257                    }
258
259                    // we can safely ignore errors here, it's possible the caller
260                    // wasn't interested in waiting and dropped their receiver
261                    let _ = completer.send(Ok(()));
262
263                    // moves `permit` into async scope so it can be dropped upon completion
264                    drop(permit);
265                });
266            }
267        });
268
269        Compactor {
270            cfg,
271            metrics,
272            sender: compact_req_sender,
273            _phantom: PhantomData,
274        }
275    }
276
277    /// Enqueues a [CompactReq] to be consumed by the compaction background task when available.
278    ///
279    /// Returns a receiver that indicates when compaction has completed. The receiver can be
280    /// safely dropped at any time if the caller does not wish to wait on completion.
281    pub fn compact_and_apply_background(
282        &self,
283        req: CompactReq<T>,
284        machine: &Machine<K, V, T, D>,
285    ) -> Option<oneshot::Receiver<Result<(), anyhow::Error>>> {
286        // Run some initial heuristics to ignore some requests for compaction.
287        // We don't gain much from e.g. compacting two very small batches that
288        // were just written, but it does result in non-trivial blob traffic
289        // (especially in aggregate). This heuristic is something we'll need to
290        // tune over time.
291        let should_compact = req.inputs.len() >= COMPACTION_HEURISTIC_MIN_INPUTS.get(&self.cfg)
292            || req
293                .inputs
294                .iter()
295                .map(|x| x.batch.part_count())
296                .sum::<usize>()
297                >= COMPACTION_HEURISTIC_MIN_PARTS.get(&self.cfg)
298            || req.inputs.iter().map(|x| x.batch.len).sum::<usize>()
299                >= COMPACTION_HEURISTIC_MIN_UPDATES.get(&self.cfg);
300        if !should_compact {
301            self.metrics.compaction.skipped.inc();
302            return None;
303        }
304
305        let (compaction_completed_sender, compaction_completed_receiver) = oneshot::channel();
306        let new_compaction_sender = self.sender.clone();
307
308        self.metrics.compaction.requested.inc();
309        // NB: we intentionally pass along the input machine, as it ought to come from the
310        // writer that generated the compaction request / maintenance. this machine has a
311        // spine structure that generated the request, so it has a much better chance of
312        // merging and committing the result than a machine kept up-to-date through state
313        // diffs, which may have a different spine structure less amenable to merging.
314        let send = new_compaction_sender.try_send((
315            Instant::now(),
316            req,
317            machine.clone(),
318            compaction_completed_sender,
319        ));
320        if let Err(_) = send {
321            self.metrics.compaction.dropped.inc();
322            return None;
323        }
324
325        Some(compaction_completed_receiver)
326    }
327
328    pub(crate) async fn compact_and_apply(
329        machine: &Machine<K, V, T, D>,
330        req: CompactReq<T>,
331        write_schemas: Schemas<K, V>,
332    ) -> Result<RoutineMaintenance, anyhow::Error> {
333        let metrics = Arc::clone(&machine.applier.metrics);
334        metrics.compaction.started.inc();
335        let start = Instant::now();
336
337        // pick a timeout for our compaction request proportional to the amount
338        // of data that must be read (with a minimum set by PersistConfig)
339        let total_input_bytes = req
340            .inputs
341            .iter()
342            .map(|batch| batch.batch.encoded_size_bytes())
343            .sum::<usize>();
344        let timeout = Duration::max(
345            // either our minimum timeout
346            COMPACTION_MINIMUM_TIMEOUT.get(&machine.applier.cfg),
347            // or 1s per MB of input data
348            Duration::from_secs(u64::cast_from(total_input_bytes / MiB)),
349        );
350        // always use most recent schema from all the Runs we're compacting to prevent Compactors
351        // created before the schema was evolved, from trying to "de-evolve" a Part.
352        let compaction_schema_id = req
353            .inputs
354            .iter()
355            .flat_map(|batch| batch.batch.run_meta.iter())
356            .filter_map(|run_meta| run_meta.schema)
357            // It's an invariant that SchemaIds are ordered.
358            .max();
359        let maybe_compaction_schema = match compaction_schema_id {
360            Some(id) => machine
361                .get_schema(id)
362                .map(|(key_schema, val_schema)| (id, key_schema, val_schema)),
363            None => None,
364        };
365        let use_most_recent_schema = COMPACTION_USE_MOST_RECENT_SCHEMA.get(&machine.applier.cfg);
366
367        let compaction_schema = match maybe_compaction_schema {
368            Some((id, key_schema, val_schema)) if use_most_recent_schema => {
369                metrics.compaction.schema_selection.recent_schema.inc();
370                Schemas {
371                    id: Some(id),
372                    key: Arc::new(key_schema),
373                    val: Arc::new(val_schema),
374                }
375            }
376            Some(_) => {
377                metrics.compaction.schema_selection.disabled.inc();
378                write_schemas
379            }
380            None => {
381                metrics.compaction.schema_selection.no_schema.inc();
382                write_schemas
383            }
384        };
385
386        trace!(
387            "compaction request for {}MBs ({} bytes), with timeout of {}s, and schema {:?}.",
388            total_input_bytes / MiB,
389            total_input_bytes,
390            timeout.as_secs_f64(),
391            compaction_schema.id,
392        );
393
394        let isolated_runtime = Arc::clone(&machine.isolated_runtime);
395        let machine_clone = machine.clone();
396        let metrics_clone = Arc::clone(&machine.applier.metrics);
397        let compact_span = debug_span!("compact::consolidate");
398        let res = tokio::time::timeout(
399            timeout,
400            // Compaction is cpu intensive, so be polite and spawn it on the isolated runtime.
401            isolated_runtime.spawn_named(
402                || "persist::compact::consolidate",
403                async move {
404                    // If the batches we are compacting are written with old versions of persist,
405                    // we may not have run UUIDs for them, meaning we don't have enough info to
406                    // safely compact them incrementally.
407                    let all_runs_have_uuids = req
408                        .inputs
409                        .iter()
410                        .all(|x| x.batch.runs().all(|(meta, _)| meta.id.is_some()));
411                    let all_runs_have_len = req
412                        .inputs
413                        .iter()
414                        .all(|x| x.batch.runs().all(|(meta, _)| meta.len.is_some()));
415
416                    let compact_cfg =
417                        CompactConfig::new(&machine_clone.applier.cfg, machine_clone.shard_id());
418                    let incremental_enabled = compact_cfg.batch.enable_incremental_compaction
419                        && all_runs_have_uuids
420                        && all_runs_have_len;
421                    let stream = Self::compact_stream(
422                        compact_cfg,
423                        Arc::clone(&machine_clone.applier.state_versions.blob),
424                        Arc::clone(&metrics_clone),
425                        Arc::clone(&machine_clone.applier.shard_metrics),
426                        Arc::clone(&machine_clone.isolated_runtime),
427                        req.clone(),
428                        compaction_schema,
429                        incremental_enabled,
430                    );
431
432                    let maintenance = if incremental_enabled {
433                        let mut maintenance = RoutineMaintenance::default();
434                        pin_mut!(stream);
435                        while let Some(res) = stream.next().await {
436                            let res = res?;
437                            let new_maintenance =
438                                Self::apply(res, &metrics_clone, &machine_clone).await?;
439                            maintenance.merge(new_maintenance);
440                        }
441                        maintenance
442                    } else {
443                        let res = Self::compact_all(stream, req.clone()).await?;
444                        Self::apply(
445                            FueledMergeRes {
446                                output: res.output,
447                                input: CompactionInput::Legacy,
448                                new_active_compaction: None,
449                            },
450                            &metrics_clone,
451                            &machine_clone,
452                        )
453                        .await?
454                    };
455
456                    Ok::<_, anyhow::Error>(maintenance)
457                }
458                .instrument(compact_span),
459            ),
460        )
461        .await;
462
463        metrics
464            .compaction
465            .seconds
466            .inc_by(start.elapsed().as_secs_f64());
467        let res = res
468            .map_err(|e| {
469                metrics.compaction.timed_out.inc();
470                anyhow!(
471                    "compaction timed out after {}s: {}",
472                    timeout.as_secs_f64(),
473                    e
474                )
475            })?
476            .map_err(|e| anyhow!(e))?;
477
478        match res {
479            Ok(maintenance) => Ok(maintenance),
480            Err(err) => {
481                metrics.compaction.failed.inc();
482                debug!(
483                    "compaction for {} failed: {}",
484                    machine.shard_id(),
485                    err.display_with_causes()
486                );
487                Err(err)
488            }
489        }
490    }
491
492    pub async fn compact_all(
493        stream: impl Stream<Item = Result<FueledMergeRes<T>, anyhow::Error>>,
494        req: CompactReq<T>,
495    ) -> Result<CompactRes<T>, anyhow::Error> {
496        pin_mut!(stream);
497
498        let mut all_parts = vec![];
499        let mut all_run_splits = vec![];
500        let mut all_run_meta = vec![];
501        let mut len = 0;
502
503        while let Some(res) = stream.next().await {
504            let res = res?.output;
505            let (parts, updates, run_meta, run_splits) =
506                (res.parts, res.len, res.run_meta, res.run_splits);
507
508            if updates == 0 {
509                continue;
510            }
511
512            let run_offset = all_parts.len();
513            if !all_parts.is_empty() {
514                all_run_splits.push(run_offset);
515            }
516            all_run_splits.extend(run_splits.iter().map(|r| r + run_offset));
517            all_run_meta.extend(run_meta);
518            all_parts.extend(parts);
519            len += updates;
520        }
521
522        let batches = req.inputs.iter().map(|x| x.id).collect::<BTreeSet<_>>();
523        let input = input_id_range(batches);
524
525        Ok(CompactRes {
526            output: HollowBatch::new(
527                req.desc.clone(),
528                all_parts,
529                len,
530                all_run_meta,
531                all_run_splits,
532            ),
533            input,
534        })
535    }
536
537    pub async fn apply(
538        res: FueledMergeRes<T>,
539        metrics: &Metrics,
540        machine: &Machine<K, V, T, D>,
541    ) -> Result<RoutineMaintenance, anyhow::Error> {
542        let (apply_merge_result, maintenance) = machine.merge_res(&res).await;
543
544        match &apply_merge_result {
545            ApplyMergeResult::AppliedExact => {
546                metrics.compaction.applied.inc();
547                metrics.compaction.applied_exact_match.inc();
548                machine.applier.shard_metrics.compaction_applied.inc();
549            }
550            ApplyMergeResult::AppliedSubset => {
551                metrics.compaction.applied.inc();
552                metrics.compaction.applied_subset_match.inc();
553                machine.applier.shard_metrics.compaction_applied.inc();
554            }
555            ApplyMergeResult::NotAppliedNoMatch
556            | ApplyMergeResult::NotAppliedInvalidSince
557            | ApplyMergeResult::NotAppliedTooManyUpdates => {
558                if let ApplyMergeResult::NotAppliedTooManyUpdates = &apply_merge_result {
559                    metrics.compaction.not_applied_too_many_updates.inc();
560                }
561                metrics.compaction.noop.inc();
562                let mut part_deletes = PartDeletes::default();
563                for part in &res.output.parts {
564                    part_deletes.add(part);
565                }
566                part_deletes
567                    .delete(
568                        machine.applier.state_versions.blob.as_ref(),
569                        machine.shard_id(),
570                        GC_BLOB_DELETE_CONCURRENCY_LIMIT.get(&machine.applier.cfg),
571                        &*metrics,
572                        &metrics.retries.external.compaction_noop_delete,
573                    )
574                    .await;
575            }
576        };
577
578        Ok(maintenance)
579    }
580
581    /// Compacts input batches in bounded memory.
582    ///
583    /// The memory bound is broken into pieces:
584    ///     1. in-progress work
585    ///     2. fetching parts from runs
586    ///     3. additional in-flight requests to Blob
587    ///
588    /// 1. In-progress work is bounded by 2 * [BatchBuilderConfig::blob_target_size]. This
589    ///    usage is met at two mutually exclusive moments:
590    ///   * When reading in a part, we hold the columnar format in memory while writing its
591    ///     contents into a heap.
592    ///   * When writing a part, we hold a temporary updates buffer while encoding/writing
593    ///     it into a columnar format for Blob.
594    ///
595    /// 2. When compacting runs, only 1 part from each one is held in memory at a time.
596    ///    Compaction will determine an appropriate number of runs to compact together
597    ///    given the memory bound and accounting for the reservation in (1). A minimum
598    ///    of 2 * [BatchBuilderConfig::blob_target_size] of memory is expected, to be
599    ///    able to at least have the capacity to compact two runs together at a time,
600    ///    and more runs will be compacted together if more memory is available.
601    ///
602    /// 3. If there is excess memory after accounting for (1) and (2), we increase the
603    ///    number of outstanding parts we can keep in-flight to Blob.
604    pub fn compact_stream(
605        cfg: CompactConfig,
606        blob: Arc<dyn Blob>,
607        metrics: Arc<Metrics>,
608        shard_metrics: Arc<ShardMetrics>,
609        isolated_runtime: Arc<IsolatedRuntime>,
610        req: CompactReq<T>,
611        write_schemas: Schemas<K, V>,
612        incremental_enabled: bool,
613    ) -> impl Stream<Item = Result<FueledMergeRes<T>, anyhow::Error>> {
614        async_stream::stream! {
615            let () = Self::validate_req(&req)?;
616
617            // We introduced a fast-path optimization in https://github.com/MaterializeInc/materialize/pull/15363
618            // but had to revert it due to a very scary bug. Here we count how many of our compaction reqs
619            // could be eligible for the optimization to better understand whether it's worth trying to
620            // reintroduce it.
621            let mut single_nonempty_batch = None;
622            for batch in &req.inputs {
623                if batch.batch.len > 0 {
624                    match single_nonempty_batch {
625                        None => single_nonempty_batch = Some(batch),
626                        Some(_previous_nonempty_batch) => {
627                            single_nonempty_batch = None;
628                            break;
629                        }
630                    }
631                }
632            }
633            if let Some(single_nonempty_batch) = single_nonempty_batch {
634                if single_nonempty_batch.batch.run_splits.len() == 0
635                    && single_nonempty_batch.batch.desc.since() != &Antichain::from_elem(T::minimum())
636                {
637                    metrics.compaction.fast_path_eligible.inc();
638                }
639            }
640
641            // Reserve space for the in-progress part to be held in-mem representation and columnar -
642            let in_progress_part_reserved_memory_bytes = 2 * cfg.batch.blob_target_size;
643            // - then remaining memory will go towards pulling down as many runs as we can.
644            // We'll always do at least two runs per chunk, which means we may go over this limit
645            // if parts are large or the limit is low... though we do at least increment a metric
646            // when that happens.
647            let run_reserved_memory_bytes = cfg
648                .compaction_memory_bound_bytes
649                .saturating_sub(in_progress_part_reserved_memory_bytes);
650
651            let chunked_runs = Self::chunk_runs(
652                &req,
653                &cfg,
654                &*metrics,
655                run_reserved_memory_bytes,
656                req.desc.since()
657            );
658            let total_chunked_runs = chunked_runs.len();
659
660            let parts_before = req.inputs.iter().map(|x| x.batch.parts.len()).sum::<usize>();
661            let parts_after = chunked_runs.iter().flat_map(|(_, _, runs, _)| runs.iter().map(|(_, _, parts)| parts.len())).sum::<usize>();
662            assert_eq!(parts_before, parts_after, "chunking should not change the number of parts");
663
664            for (applied, (input, desc, runs, run_chunk_max_memory_usage)) in
665                chunked_runs.into_iter().enumerate()
666            {
667                metrics.compaction.chunks_compacted.inc();
668                metrics
669                    .compaction
670                    .runs_compacted
671                    .inc_by(u64::cast_from(runs.len()));
672
673                // given the runs we actually have in our batch, we might have extra memory
674                // available. we reserved enough space to always have 1 in-progress part in
675                // flight, but if we have excess, we can use it to increase our write parallelism
676                let extra_outstanding_parts = (run_reserved_memory_bytes
677                    .saturating_sub(run_chunk_max_memory_usage))
678                    / cfg.batch.blob_target_size;
679                let mut run_cfg = cfg.clone();
680                run_cfg.batch.batch_builder_max_outstanding_parts = 1 + extra_outstanding_parts;
681
682                let desc = if incremental_enabled {
683                    desc
684                } else {
685                    req.desc.clone()
686                };
687
688                let runs = runs.iter()
689                    .map(|(desc, meta, run)| (*desc, *meta, *run))
690                    .collect::<Vec<_>>();
691
692                let batch = Self::compact_runs(
693                    &run_cfg,
694                    &req.shard_id,
695                    &desc,
696                    runs,
697                    Arc::clone(&blob),
698                    Arc::clone(&metrics),
699                    Arc::clone(&shard_metrics),
700                    Arc::clone(&isolated_runtime),
701                    write_schemas.clone(),
702                )
703                .await?;
704
705                assert!(
706                    (batch.len == 0 && batch.parts.len() == 0) || (batch.len > 0 && batch.parts.len() > 0),
707                    "updates={}, parts={}",
708                    batch.len,
709                    batch.parts.len(),
710                );
711
712                // Set up active compaction metadata
713                let clock = cfg.now.clone();
714                let active_compaction = if applied < total_chunked_runs - 1 {
715                    Some(ActiveCompaction { start_ms: clock() })
716                } else {
717                    None
718                };
719
720                let res = CompactRes {
721                    output: batch,
722                    input,
723                };
724
725                let res = FueledMergeRes {
726                    output: res.output,
727                    new_active_compaction: active_compaction,
728                    input: res.input,
729                };
730
731                yield Ok(res);
732            }
733        }
734    }
735
736    /// Compacts the input batches together, returning a single compacted batch.
737    /// Under the hood this just calls [Self::compact_stream] and
738    /// [Self::compact_all], but it is a convenience method that allows
739    /// the caller to not have to deal with the streaming API.
740    pub async fn compact(
741        cfg: CompactConfig,
742        blob: Arc<dyn Blob>,
743        metrics: Arc<Metrics>,
744        shard_metrics: Arc<ShardMetrics>,
745        isolated_runtime: Arc<IsolatedRuntime>,
746        req: CompactReq<T>,
747        write_schemas: Schemas<K, V>,
748    ) -> Result<CompactRes<T>, anyhow::Error> {
749        let stream = Self::compact_stream(
750            cfg,
751            Arc::clone(&blob),
752            Arc::clone(&metrics),
753            Arc::clone(&shard_metrics),
754            Arc::clone(&isolated_runtime),
755            req.clone(),
756            write_schemas,
757            false,
758        );
759
760        Self::compact_all(stream, req).await
761    }
762
763    /// Chunks runs with the following rules:
764    /// 1. Runs from multiple batches are allowed to be mixed as long as _every_ run in the
765    ///    batch is present in the chunk.
766    /// 2. Otherwise, runs are split into chunks of runs from a single batch.
767    fn chunk_runs<'a>(
768        req: &'a CompactReq<T>,
769        cfg: &CompactConfig,
770        metrics: &Metrics,
771        run_reserved_memory_bytes: usize,
772        since: &Antichain<T>,
773    ) -> Vec<(
774        CompactionInput,
775        Description<T>,
776        Vec<(&'a Description<T>, &'a RunMeta, &'a [RunPart<T>])>,
777        usize,
778    )> {
779        // Assert that all of the inputs are contiguous / can be compacted together.
780        let _ = input_id_range(req.inputs.iter().map(|x| x.id).collect());
781
782        // Iterate through batches by spine id.
783        let mut batches: Vec<_> = req.inputs.iter().map(|x| (x.id, &*x.batch)).collect();
784        batches.sort_by_key(|(id, _)| *id);
785
786        let mut chunks = vec![];
787        let mut current_chunk_ids = BTreeSet::new();
788        let mut current_chunk_descs = Vec::new();
789        let mut current_chunk_runs = vec![];
790        let mut current_chunk_max_memory_usage = 0;
791
792        fn max_part_bytes<T>(parts: &[RunPart<T>], cfg: &CompactConfig) -> usize {
793            parts
794                .iter()
795                .map(|p| p.max_part_bytes())
796                .max()
797                .unwrap_or(cfg.batch.blob_target_size)
798        }
799
800        fn desc_range<T: Timestamp>(
801            descs: impl IntoIterator<Item = Description<T>>,
802            since: Antichain<T>,
803        ) -> Description<T> {
804            let mut descs = descs.into_iter();
805            let first = descs.next().expect("non-empty set of descriptions");
806            let lower = first.lower().clone();
807            let mut upper = first.upper().clone();
808            for desc in descs {
809                assert_eq!(&upper, desc.lower());
810                upper = desc.upper().clone();
811            }
812            let upper = upper.clone();
813            Description::new(lower, upper, since)
814        }
815
816        for (spine_id, batch) in batches {
817            let batch_size = batch
818                .runs()
819                .map(|(_, parts)| max_part_bytes(parts, cfg))
820                .sum::<usize>();
821
822            let num_runs = batch.run_meta.len();
823
824            let runs = batch.runs().flat_map(|(meta, parts)| {
825                if meta.order.unwrap_or(RunOrder::Codec) == cfg.batch.preferred_order {
826                    Either::Left(std::iter::once((&batch.desc, meta, parts)))
827                } else {
828                    // The downstream consolidation step will handle a long run that's not in
829                    // the desired order by splitting it up into many single-element runs. This preserves
830                    // correctness, but it means that we may end up needing to iterate through
831                    // many more parts concurrently than expected, increasing memory use. Instead,
832                    // we break up those runs into individual batch parts, fetching hollow runs as
833                    // necessary, before they're grouped together to be passed to consolidation.
834                    // The downside is that this breaks the usual property that compaction produces
835                    // fewer runs than it takes in. This should generally be resolved by future
836                    // runs of compaction.
837                    soft_assert_or_log!(
838                        !parts.iter().any(|r| matches!(r, RunPart::Many(_))),
839                        "unexpected out-of-order hollow run"
840                    );
841                    Either::Right(
842                        parts
843                            .iter()
844                            .map(move |p| (&batch.desc, meta, std::slice::from_ref(p))),
845                    )
846                }
847            });
848
849            // Combine the given batch into the current chunk
850            // - if they fit within the memory budget,
851            // - if both have only at most a single run, so otherwise compaction wouldn't make progress.
852            if current_chunk_max_memory_usage + batch_size <= run_reserved_memory_bytes
853                || current_chunk_runs.len() + num_runs <= 2
854            {
855                if current_chunk_max_memory_usage + batch_size > run_reserved_memory_bytes {
856                    // We've chosen to merge these batches together despite being over budget,
857                    // which should be rare.
858                    metrics.compaction.memory_violations.inc();
859                }
860                current_chunk_ids.insert(spine_id);
861                current_chunk_descs.push(batch.desc.clone());
862                current_chunk_runs.extend(runs);
863                current_chunk_max_memory_usage += batch_size;
864                continue;
865            }
866
867            // Otherwise, we cannot mix this batch partially. Flush any existing mixed chunk first.
868            if !current_chunk_ids.is_empty() {
869                chunks.push((
870                    input_id_range(std::mem::take(&mut current_chunk_ids)),
871                    desc_range(mem::take(&mut current_chunk_descs), since.clone()),
872                    std::mem::take(&mut current_chunk_runs),
873                    current_chunk_max_memory_usage,
874                ));
875                current_chunk_max_memory_usage = 0;
876            }
877
878            // If the batch fits within limits, try and accumulate future batches into it.
879            if batch_size <= run_reserved_memory_bytes {
880                current_chunk_ids.insert(spine_id);
881                current_chunk_descs.push(batch.desc.clone());
882                current_chunk_runs.extend(runs);
883                current_chunk_max_memory_usage += batch_size;
884                continue;
885            }
886
887            // This batch is too large to compact with others, or even in a single go.
888            // Process this batch alone, splitting into single-batch chunks as needed.
889            let mut run_iter = runs.into_iter().peekable();
890            debug_assert!(current_chunk_ids.is_empty());
891            debug_assert!(current_chunk_descs.is_empty());
892            debug_assert!(current_chunk_runs.is_empty());
893            debug_assert_eq!(current_chunk_max_memory_usage, 0);
894            let mut current_chunk_run_ids = BTreeSet::new();
895
896            while let Some((desc, meta, parts)) = run_iter.next() {
897                let run_size = max_part_bytes(parts, cfg);
898                current_chunk_runs.push((desc, meta, parts));
899                current_chunk_max_memory_usage += run_size;
900                current_chunk_run_ids.extend(meta.id);
901
902                if let Some((_, _meta, next_parts)) = run_iter.peek() {
903                    let next_size = max_part_bytes(next_parts, cfg);
904                    if current_chunk_max_memory_usage + next_size > run_reserved_memory_bytes {
905                        // If the current chunk only has one run, record a memory violation metric.
906                        if current_chunk_runs.len() == 1 {
907                            metrics.compaction.memory_violations.inc();
908                            continue;
909                        }
910                        // Flush the current chunk and start a new one.
911                        chunks.push((
912                            CompactionInput::PartialBatch(
913                                spine_id,
914                                mem::take(&mut current_chunk_run_ids),
915                            ),
916                            desc_range([batch.desc.clone()], since.clone()),
917                            std::mem::take(&mut current_chunk_runs),
918                            current_chunk_max_memory_usage,
919                        ));
920                        current_chunk_max_memory_usage = 0;
921                    }
922                }
923            }
924
925            if !current_chunk_runs.is_empty() {
926                chunks.push((
927                    CompactionInput::PartialBatch(spine_id, mem::take(&mut current_chunk_run_ids)),
928                    desc_range([batch.desc.clone()], since.clone()),
929                    std::mem::take(&mut current_chunk_runs),
930                    current_chunk_max_memory_usage,
931                ));
932                current_chunk_max_memory_usage = 0;
933            }
934        }
935
936        // If we ended with a mixed-batch chunk in progress, flush it.
937        if !current_chunk_ids.is_empty() {
938            chunks.push((
939                input_id_range(current_chunk_ids),
940                desc_range(current_chunk_descs, since.clone()),
941                current_chunk_runs,
942                current_chunk_max_memory_usage,
943            ));
944        }
945
946        chunks
947    }
948
949    /// Compacts runs together. If the input runs are sorted, a single run will be created as output.
950    ///
951    /// Maximum possible memory usage is `(# runs + 2) * [crate::PersistConfig::blob_target_size]`
952    pub(crate) async fn compact_runs(
953        cfg: &CompactConfig,
954        shard_id: &ShardId,
955        desc: &Description<T>,
956        runs: Vec<(&Description<T>, &RunMeta, &[RunPart<T>])>,
957        blob: Arc<dyn Blob>,
958        metrics: Arc<Metrics>,
959        shard_metrics: Arc<ShardMetrics>,
960        isolated_runtime: Arc<IsolatedRuntime>,
961        write_schemas: Schemas<K, V>,
962    ) -> Result<HollowBatch<T>, anyhow::Error> {
963        // TODO: Figure out a more principled way to allocate our memory budget.
964        // Currently, we give any excess budget to write parallelism. If we had
965        // to pick between 100% towards writes vs 100% towards reads, then reads
966        // is almost certainly better, but the ideal is probably somewhere in
967        // between the two.
968        //
969        // For now, invent some some extra budget out of thin air for prefetch.
970        let prefetch_budget_bytes = 2 * cfg.batch.blob_target_size;
971
972        let mut timings = Timings::default();
973
974        let mut batch_cfg = cfg.batch.clone();
975
976        // Use compaction as a method of getting inline writes out of state, to
977        // make room for more inline writes. We could instead do this at the end
978        // of compaction by flushing out the batch, but doing it here based on
979        // the config allows BatchBuilder to do its normal pipelining of writes.
980        batch_cfg.inline_writes_single_max_bytes = 0;
981
982        let parts = BatchParts::new_ordered::<D>(
983            batch_cfg,
984            cfg.batch.preferred_order,
985            Arc::clone(&metrics),
986            Arc::clone(&shard_metrics),
987            *shard_id,
988            Arc::clone(&blob),
989            Arc::clone(&isolated_runtime),
990            &metrics.compaction.batch,
991        );
992        let mut batch = BatchBuilderInternal::<K, V, T, D>::new(
993            cfg.batch.clone(),
994            parts,
995            Arc::clone(&metrics),
996            write_schemas.clone(),
997            Arc::clone(&blob),
998            shard_id.clone(),
999            cfg.version.clone(),
1000        );
1001
1002        let mut consolidator = Consolidator::new(
1003            format!(
1004                "{}[lower={:?},upper={:?}]",
1005                shard_id,
1006                desc.lower().elements(),
1007                desc.upper().elements()
1008            ),
1009            cfg.fetch_config.clone(),
1010            *shard_id,
1011            StructuredSort::<K, V, T, D>::new(write_schemas.clone()),
1012            blob,
1013            Arc::clone(&metrics),
1014            shard_metrics,
1015            metrics.read.compaction.clone(),
1016            FetchBatchFilter::Compaction {
1017                since: desc.since().clone(),
1018            },
1019            None,
1020            prefetch_budget_bytes,
1021        );
1022
1023        for (desc, meta, parts) in runs {
1024            consolidator.enqueue_run(desc, meta, parts.iter().cloned());
1025        }
1026
1027        let remaining_budget = consolidator.start_prefetches();
1028        if remaining_budget.is_none() {
1029            metrics.compaction.not_all_prefetched.inc();
1030        }
1031
1032        loop {
1033            let mut chunks = vec![];
1034            let mut total_bytes = 0;
1035            // We attempt to pull chunks out of the consolidator that match our target size,
1036            // but it's possible that we may get smaller chunks... for example, if not all
1037            // parts have been fetched yet. Loop until we've got enough data to justify flushing
1038            // it out to blob (or we run out of data.)
1039            while total_bytes < cfg.batch.blob_target_size {
1040                let fetch_start = Instant::now();
1041                let Some(chunk) = consolidator
1042                    .next_chunk(
1043                        cfg.compaction_yield_after_n_updates,
1044                        cfg.batch.blob_target_size - total_bytes,
1045                    )
1046                    .await?
1047                else {
1048                    break;
1049                };
1050                timings.part_fetching += fetch_start.elapsed();
1051                total_bytes += chunk.goodbytes();
1052                chunks.push(chunk);
1053                tokio::task::yield_now().await;
1054            }
1055
1056            // In the hopefully-common case of a single chunk, this will not copy.
1057            let Some(updates) = Part::concat(&chunks).expect("compaction produces well-typed data")
1058            else {
1059                break;
1060            };
1061            batch.flush_part(desc.clone(), updates).await;
1062        }
1063        let mut batch = batch.finish(desc.clone()).await?;
1064
1065        // We use compaction as a method of getting inline writes out of state,
1066        // to make room for more inline writes. This happens in
1067        // `CompactConfig::new` by overriding the inline writes threshold
1068        // config. This is a bit action-at-a-distance, so defensively detect if
1069        // this breaks here and log and correct it if so.
1070        let has_inline_parts = batch.batch.parts.iter().any(|x| x.is_inline());
1071        if has_inline_parts {
1072            error!(%shard_id, ?cfg, "compaction result unexpectedly had inline writes");
1073            let () = batch
1074                .flush_to_blob(
1075                    &cfg.batch,
1076                    &metrics.compaction.batch,
1077                    &isolated_runtime,
1078                    &write_schemas,
1079                )
1080                .await;
1081        }
1082
1083        timings.record(&metrics);
1084        Ok(batch.into_hollow_batch())
1085    }
1086
1087    fn validate_req(req: &CompactReq<T>) -> Result<(), anyhow::Error> {
1088        let mut frontier = req.desc.lower();
1089        for input in req.inputs.iter() {
1090            if PartialOrder::less_than(req.desc.since(), input.batch.desc.since()) {
1091                return Err(anyhow!(
1092                    "output since {:?} must be at or in advance of input since {:?}",
1093                    req.desc.since(),
1094                    input.batch.desc.since()
1095                ));
1096            }
1097            if frontier != input.batch.desc.lower() {
1098                return Err(anyhow!(
1099                    "invalid merge of non-consecutive batches {:?} vs {:?}",
1100                    frontier,
1101                    input.batch.desc.lower()
1102                ));
1103            }
1104            frontier = input.batch.desc.upper();
1105        }
1106        if frontier != req.desc.upper() {
1107            return Err(anyhow!(
1108                "invalid merge of non-consecutive batches {:?} vs {:?}",
1109                frontier,
1110                req.desc.upper()
1111            ));
1112        }
1113        Ok(())
1114    }
1115}
1116
1117#[derive(Debug, Default)]
1118struct Timings {
1119    part_fetching: Duration,
1120    heap_population: Duration,
1121}
1122
1123impl Timings {
1124    fn record(self, metrics: &Metrics) {
1125        // intentionally deconstruct so we don't forget to consider each field
1126        let Timings {
1127            part_fetching,
1128            heap_population,
1129        } = self;
1130
1131        metrics
1132            .compaction
1133            .steps
1134            .part_fetch_seconds
1135            .inc_by(part_fetching.as_secs_f64());
1136        metrics
1137            .compaction
1138            .steps
1139            .heap_population_seconds
1140            .inc_by(heap_population.as_secs_f64());
1141    }
1142}
1143
1144#[cfg(test)]
1145mod tests {
1146    use mz_dyncfg::ConfigUpdates;
1147    use mz_ore::{assert_contains, assert_err};
1148    use mz_persist_types::codec_impls::StringSchema;
1149    use timely::progress::Antichain;
1150
1151    use crate::PersistLocation;
1152    use crate::batch::BLOB_TARGET_SIZE;
1153    use crate::internal::trace::SpineId;
1154    use crate::tests::{all_ok, expect_fetch_part, new_test_client_cache};
1155
1156    use super::*;
1157
1158    // A regression test for a bug caught during development of materialize#13160 (never
1159    // made it to main) where batches written by compaction would always have a
1160    // since of the minimum timestamp.
1161    #[mz_persist_proc::test(tokio::test)]
1162    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1163    async fn regression_minimum_since(dyncfgs: ConfigUpdates) {
1164        let data = vec![
1165            (("0".to_owned(), "zero".to_owned()), 0, 1),
1166            (("0".to_owned(), "zero".to_owned()), 1, -1),
1167            (("1".to_owned(), "one".to_owned()), 1, 1),
1168        ];
1169
1170        let cache = new_test_client_cache(&dyncfgs);
1171        cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1172        let (mut write, _) = cache
1173            .open(PersistLocation::new_in_mem())
1174            .await
1175            .expect("client construction failed")
1176            .expect_open::<String, String, u64, i64>(ShardId::new())
1177            .await;
1178        let b0 = write
1179            .expect_batch(&data[..1], 0, 1)
1180            .await
1181            .into_hollow_batch();
1182        let b1 = write
1183            .expect_batch(&data[1..], 1, 2)
1184            .await
1185            .into_hollow_batch();
1186
1187        let req = CompactReq {
1188            shard_id: write.machine.shard_id(),
1189            desc: Description::new(
1190                b0.desc.lower().clone(),
1191                b1.desc.upper().clone(),
1192                Antichain::from_elem(10u64),
1193            ),
1194            inputs: vec![
1195                IdHollowBatch {
1196                    batch: Arc::new(b0),
1197                    id: SpineId(0, 1),
1198                },
1199                IdHollowBatch {
1200                    batch: Arc::new(b1),
1201                    id: SpineId(1, 2),
1202                },
1203            ],
1204        };
1205        let schemas = Schemas {
1206            id: None,
1207            key: Arc::new(StringSchema),
1208            val: Arc::new(StringSchema),
1209        };
1210        let res = Compactor::<String, String, u64, i64>::compact(
1211            CompactConfig::new(&write.cfg, write.shard_id()),
1212            Arc::clone(&write.blob),
1213            Arc::clone(&write.metrics),
1214            write.metrics.shards.shard(&write.machine.shard_id(), ""),
1215            Arc::new(IsolatedRuntime::new_for_tests()),
1216            req.clone(),
1217            schemas.clone(),
1218        )
1219        .await
1220        .expect("compaction failed");
1221
1222        assert_eq!(res.output.desc, req.desc);
1223        assert_eq!(res.output.len, 1);
1224        assert_eq!(res.output.part_count(), 1);
1225        let part = res.output.parts[0].expect_hollow_part();
1226        let (part, updates) = expect_fetch_part(
1227            write.blob.as_ref(),
1228            &part.key.complete(&write.machine.shard_id()),
1229            &write.metrics,
1230            &schemas,
1231        )
1232        .await;
1233        assert_eq!(part.desc, res.output.desc);
1234        assert_eq!(updates, all_ok(&data, 10));
1235    }
1236
1237    #[mz_persist_proc::test(tokio::test)]
1238    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1239    async fn disable_compaction(dyncfgs: ConfigUpdates) {
1240        let data = [
1241            (("0".to_owned(), "zero".to_owned()), 0, 1),
1242            (("0".to_owned(), "zero".to_owned()), 1, -1),
1243            (("1".to_owned(), "one".to_owned()), 1, 1),
1244        ];
1245
1246        let cache = new_test_client_cache(&dyncfgs);
1247        cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1248        let (mut write, _) = cache
1249            .open(PersistLocation::new_in_mem())
1250            .await
1251            .expect("client construction failed")
1252            .expect_open::<String, String, u64, i64>(ShardId::new())
1253            .await;
1254        let b0 = write
1255            .expect_batch(&data[..1], 0, 1)
1256            .await
1257            .into_hollow_batch();
1258        let b1 = write
1259            .expect_batch(&data[1..], 1, 2)
1260            .await
1261            .into_hollow_batch();
1262
1263        let req = CompactReq {
1264            shard_id: write.machine.shard_id(),
1265            desc: Description::new(
1266                b0.desc.lower().clone(),
1267                b1.desc.upper().clone(),
1268                Antichain::from_elem(10u64),
1269            ),
1270            inputs: vec![
1271                IdHollowBatch {
1272                    batch: Arc::new(b0),
1273                    id: SpineId(0, 1),
1274                },
1275                IdHollowBatch {
1276                    batch: Arc::new(b1),
1277                    id: SpineId(1, 2),
1278                },
1279            ],
1280        };
1281        write.cfg.set_config(&COMPACTION_HEURISTIC_MIN_INPUTS, 1);
1282        let compactor = write.compact.as_ref().expect("compaction hard disabled");
1283
1284        write.cfg.disable_compaction();
1285        let result = compactor
1286            .compact_and_apply_background(req.clone(), &write.machine)
1287            .expect("listener")
1288            .await
1289            .expect("channel closed");
1290        assert_err!(result);
1291        assert_contains!(result.unwrap_err().to_string(), "compaction disabled");
1292
1293        write.cfg.enable_compaction();
1294        compactor
1295            .compact_and_apply_background(req, &write.machine)
1296            .expect("listener")
1297            .await
1298            .expect("channel closed")
1299            .expect("compaction success");
1300
1301        // Make sure our CYA dyncfg works.
1302        let data2 = [
1303            (("2".to_owned(), "two".to_owned()), 2, 1),
1304            (("2".to_owned(), "two".to_owned()), 3, -1),
1305            (("3".to_owned(), "three".to_owned()), 3, 1),
1306        ];
1307
1308        let b2 = write
1309            .expect_batch(&data2[..1], 2, 3)
1310            .await
1311            .into_hollow_batch();
1312        let b3 = write
1313            .expect_batch(&data2[1..], 3, 4)
1314            .await
1315            .into_hollow_batch();
1316
1317        let req = CompactReq {
1318            shard_id: write.machine.shard_id(),
1319            desc: Description::new(
1320                b2.desc.lower().clone(),
1321                b3.desc.upper().clone(),
1322                Antichain::from_elem(20u64),
1323            ),
1324            inputs: vec![
1325                IdHollowBatch {
1326                    batch: Arc::new(b2),
1327                    id: SpineId(0, 1),
1328                },
1329                IdHollowBatch {
1330                    batch: Arc::new(b3),
1331                    id: SpineId(1, 2),
1332                },
1333            ],
1334        };
1335        let compactor = write.compact.as_ref().expect("compaction hard disabled");
1336
1337        // When the dyncfg is set to false we should ignore the process flag.
1338        write.cfg.set_config(&COMPACTION_CHECK_PROCESS_FLAG, false);
1339        write.cfg.disable_compaction();
1340        // Compaction still succeeded!
1341        compactor
1342            .compact_and_apply_background(req, &write.machine)
1343            .expect("listener")
1344            .await
1345            .expect("channel closed")
1346            .expect("compaction success");
1347    }
1348}