Skip to main content

mz_persist_client/internal/
compact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11use std::fmt::Debug;
12use std::marker::PhantomData;
13use std::mem;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use differential_dataflow::difference::Monoid;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::Description;
21use futures::{Stream, pin_mut};
22use futures_util::StreamExt;
23use itertools::Either;
24use mz_dyncfg::Config;
25use mz_ore::cast::CastFrom;
26use mz_ore::error::ErrorExt;
27use mz_ore::now::NowFn;
28use mz_ore::soft_assert_or_log;
29use mz_persist::location::Blob;
30use mz_persist_types::part::Part;
31use mz_persist_types::{Codec, Codec64};
32use timely::PartialOrder;
33use timely::progress::{Antichain, Timestamp};
34use tokio::sync::mpsc::Sender;
35use tokio::sync::{TryAcquireError, mpsc, oneshot};
36use tracing::{Instrument, Span, debug, debug_span, error, trace, warn};
37
38use crate::async_runtime::IsolatedRuntime;
39use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, BatchParts, PartDeletes};
40use crate::cfg::{
41    COMPACTION_HEURISTIC_MIN_INPUTS, COMPACTION_HEURISTIC_MIN_PARTS,
42    COMPACTION_HEURISTIC_MIN_UPDATES, COMPACTION_MEMORY_BOUND_BYTES,
43    GC_BLOB_DELETE_CONCURRENCY_LIMIT, MiB,
44};
45use crate::fetch::{FetchBatchFilter, FetchConfig};
46use crate::internal::encoding::Schemas;
47use crate::internal::gc::GarbageCollector;
48use crate::internal::machine::Machine;
49use crate::internal::maintenance::RoutineMaintenance;
50use crate::internal::metrics::ShardMetrics;
51use crate::internal::state::{HollowBatch, RunMeta, RunOrder, RunPart};
52use crate::internal::trace::{
53    ActiveCompaction, ApplyMergeResult, CompactionInput, FueledMergeRes, IdHollowBatch, SpineId,
54    id_range,
55};
56use crate::iter::{Consolidator, StructuredSort};
57use crate::{Metrics, PersistConfig, ShardId};
58
59/// A request for compaction.
60///
61/// This is similar to FueledMergeReq, but intentionally a different type. If we
62/// move compaction to an rpc server, this one will become a protobuf; the type
63/// parameters will become names of codecs to look up in some registry.
64#[derive(Debug, Clone)]
65pub struct CompactReq<T> {
66    /// The shard the input and output batches belong to.
67    pub shard_id: ShardId,
68    /// A description for the output batch.
69    pub desc: Description<T>,
70    /// The updates to include in the output batch. Any data in these outside of
71    /// the output descriptions bounds should be ignored.
72    pub inputs: Vec<IdHollowBatch<T>>,
73}
74
75/// A response from compaction.
76#[derive(Debug)]
77pub struct CompactRes<T> {
78    /// The compacted batch.
79    pub output: HollowBatch<T>,
80    /// The runs that were compacted together to produce the output batch.
81    pub input: CompactionInput,
82}
83
84/// A snapshot of dynamic configs to make it easier to reason about an
85/// individual run of compaction.
86#[derive(Debug, Clone)]
87pub struct CompactConfig {
88    pub(crate) compaction_memory_bound_bytes: usize,
89    pub(crate) compaction_yield_after_n_updates: usize,
90    pub(crate) version: semver::Version,
91    pub(crate) batch: BatchBuilderConfig,
92    pub(crate) fetch_config: FetchConfig,
93    pub(crate) now: NowFn,
94}
95
96impl CompactConfig {
97    /// Initialize the compaction config from Persist configuration.
98    pub fn new(value: &PersistConfig, shard_id: ShardId) -> Self {
99        CompactConfig {
100            compaction_memory_bound_bytes: COMPACTION_MEMORY_BOUND_BYTES.get(value),
101            compaction_yield_after_n_updates: value.compaction_yield_after_n_updates,
102            version: value.build_version.clone(),
103            batch: BatchBuilderConfig::new(value, shard_id),
104            fetch_config: FetchConfig::from_persist_config(value),
105            now: value.now.clone(),
106        }
107    }
108}
109
110/// A service for performing physical and logical compaction.
111///
112/// This will possibly be called over RPC in the future. Physical compaction is
113/// merging adjacent batches. Logical compaction is advancing timestamps to a
114/// new since and consolidating the resulting updates.
115#[derive(Debug)]
116pub struct Compactor<K, V, T, D> {
117    cfg: PersistConfig,
118    metrics: Arc<Metrics>,
119    sender: Sender<(
120        Instant,
121        CompactReq<T>,
122        Machine<K, V, T, D>,
123        oneshot::Sender<Result<(), anyhow::Error>>,
124    )>,
125    _phantom: PhantomData<fn() -> D>,
126}
127
128impl<K, V, T, D> Clone for Compactor<K, V, T, D> {
129    fn clone(&self) -> Self {
130        Compactor {
131            cfg: self.cfg.clone(),
132            metrics: Arc::clone(&self.metrics),
133            sender: self.sender.clone(),
134            _phantom: Default::default(),
135        }
136    }
137}
138
139/// In Compactor::compact_and_apply_background, the minimum amount of time to
140/// allow a compaction request to run before timing it out. A request may be
141/// given a timeout greater than this value depending on the inputs' size
142pub(crate) const COMPACTION_MINIMUM_TIMEOUT: Config<Duration> = Config::new(
143    "persist_compaction_minimum_timeout",
144    Duration::from_secs(90),
145    "\
146    The minimum amount of time to allow a persist compaction request to run \
147    before timing it out (Materialize).",
148);
149
150pub(crate) const COMPACTION_CHECK_PROCESS_FLAG: Config<bool> = Config::new(
151    "persist_compaction_check_process_flag",
152    true,
153    "Whether Compactor will obey the process_requests flag in PersistConfig, \
154        which allows dynamically disabling compaction. If false, all compaction requests will be processed.",
155);
156
157/// Create a `[CompactionInput::IdRange]` from a set of `SpineId`s.
158fn input_id_range(ids: BTreeSet<SpineId>) -> CompactionInput {
159    let id = id_range(ids);
160
161    CompactionInput::IdRange(id)
162}
163
164impl<K, V, T, D> Compactor<K, V, T, D>
165where
166    K: Debug + Codec,
167    V: Debug + Codec,
168    T: Timestamp + Lattice + Codec64 + Sync,
169    D: Monoid + Ord + Codec64 + Send + Sync,
170{
171    pub fn new(
172        cfg: PersistConfig,
173        metrics: Arc<Metrics>,
174        gc: GarbageCollector<K, V, T, D>,
175    ) -> Self {
176        let (compact_req_sender, mut compact_req_receiver) = mpsc::channel::<(
177            Instant,
178            CompactReq<T>,
179            Machine<K, V, T, D>,
180            oneshot::Sender<Result<(), anyhow::Error>>,
181        )>(cfg.compaction_queue_size);
182        let concurrency_limit = Arc::new(tokio::sync::Semaphore::new(
183            cfg.compaction_concurrency_limit,
184        ));
185        let check_process_requests = COMPACTION_CHECK_PROCESS_FLAG.handle(&cfg.configs);
186        let process_requests = Arc::clone(&cfg.compaction_process_requests);
187
188        // spin off a single task responsible for executing compaction requests.
189        // work is enqueued into the task through a channel
190        let _worker_handle = mz_ore::task::spawn(|| "PersistCompactionScheduler", async move {
191            while let Some((enqueued, req, machine, completer)) = compact_req_receiver.recv().await
192            {
193                assert_eq!(req.shard_id, machine.shard_id());
194                let metrics = Arc::clone(&machine.applier.metrics);
195
196                // Only allow skipping compaction requests if the dyncfg is enabled.
197                if check_process_requests.get()
198                    && !process_requests.load(std::sync::atomic::Ordering::Relaxed)
199                {
200                    // Respond to the requester, track in our metrics, and log
201                    // that compaction is disabled.
202                    let _ = completer.send(Err(anyhow::anyhow!("compaction disabled")));
203                    metrics.compaction.disabled.inc();
204                    tracing::warn!(shard_id = ?req.shard_id, "Dropping compaction request on the floor.");
205
206                    continue;
207                }
208
209                let permit = {
210                    let inner = Arc::clone(&concurrency_limit);
211                    // perform a non-blocking attempt to acquire a permit so we can
212                    // record how often we're ever blocked on the concurrency limit
213                    match inner.try_acquire_owned() {
214                        Ok(permit) => permit,
215                        Err(TryAcquireError::NoPermits) => {
216                            metrics.compaction.concurrency_waits.inc();
217                            Arc::clone(&concurrency_limit)
218                                .acquire_owned()
219                                .await
220                                .expect("semaphore is never closed")
221                        }
222                        Err(TryAcquireError::Closed) => {
223                            // should never happen in practice. the semaphore is
224                            // never explicitly closed, nor will it close on Drop
225                            warn!("semaphore for shard {} is closed", machine.shard_id());
226                            continue;
227                        }
228                    }
229                };
230                metrics
231                    .compaction
232                    .queued_seconds
233                    .inc_by(enqueued.elapsed().as_secs_f64());
234
235                let compact_span =
236                    debug_span!(parent: None, "compact::apply", shard_id=%machine.shard_id());
237                compact_span.follows_from(&Span::current());
238                let gc = gc.clone();
239                mz_ore::task::spawn(|| "PersistCompactionWorker", async move {
240                    let res = Self::compact_and_apply(&machine, req)
241                        .instrument(compact_span)
242                        .await;
243
244                    match res {
245                        Ok(maintenance) => maintenance.start_performing(&machine, &gc),
246                        Err(err) => {
247                            debug!(shard_id =? machine.shard_id(), "compaction failed: {err:#}")
248                        }
249                    }
250
251                    // we can safely ignore errors here, it's possible the caller
252                    // wasn't interested in waiting and dropped their receiver
253                    let _ = completer.send(Ok(()));
254
255                    // moves `permit` into async scope so it can be dropped upon completion
256                    drop(permit);
257                });
258            }
259        });
260
261        Compactor {
262            cfg,
263            metrics,
264            sender: compact_req_sender,
265            _phantom: PhantomData,
266        }
267    }
268
269    /// Enqueues a [CompactReq] to be consumed by the compaction background task when available.
270    ///
271    /// Returns a receiver that indicates when compaction has completed. The receiver can be
272    /// safely dropped at any time if the caller does not wish to wait on completion.
273    pub fn compact_and_apply_background(
274        &self,
275        req: CompactReq<T>,
276        machine: &Machine<K, V, T, D>,
277    ) -> Option<oneshot::Receiver<Result<(), anyhow::Error>>> {
278        // Run some initial heuristics to ignore some requests for compaction.
279        // We don't gain much from e.g. compacting two very small batches that
280        // were just written, but it does result in non-trivial blob traffic
281        // (especially in aggregate). This heuristic is something we'll need to
282        // tune over time.
283        let should_compact = req.inputs.len() >= COMPACTION_HEURISTIC_MIN_INPUTS.get(&self.cfg)
284            || req
285                .inputs
286                .iter()
287                .map(|x| x.batch.part_count())
288                .sum::<usize>()
289                >= COMPACTION_HEURISTIC_MIN_PARTS.get(&self.cfg)
290            || req.inputs.iter().map(|x| x.batch.len).sum::<usize>()
291                >= COMPACTION_HEURISTIC_MIN_UPDATES.get(&self.cfg);
292        if !should_compact {
293            self.metrics.compaction.skipped.inc();
294            return None;
295        }
296
297        let (compaction_completed_sender, compaction_completed_receiver) = oneshot::channel();
298        let new_compaction_sender = self.sender.clone();
299
300        self.metrics.compaction.requested.inc();
301        // NB: we intentionally pass along the input machine, as it ought to come from the
302        // writer that generated the compaction request / maintenance. this machine has a
303        // spine structure that generated the request, so it has a much better chance of
304        // merging and committing the result than a machine kept up-to-date through state
305        // diffs, which may have a different spine structure less amenable to merging.
306        let send = new_compaction_sender.try_send((
307            Instant::now(),
308            req,
309            machine.clone(),
310            compaction_completed_sender,
311        ));
312        if let Err(_) = send {
313            self.metrics.compaction.dropped.inc();
314            return None;
315        }
316
317        Some(compaction_completed_receiver)
318    }
319
320    pub(crate) async fn compact_and_apply(
321        machine: &Machine<K, V, T, D>,
322        req: CompactReq<T>,
323    ) -> Result<RoutineMaintenance, anyhow::Error> {
324        let metrics = Arc::clone(&machine.applier.metrics);
325        metrics.compaction.started.inc();
326        let start = Instant::now();
327
328        // pick a timeout for our compaction request proportional to the amount
329        // of data that must be read (with a minimum set by PersistConfig)
330        let total_input_bytes = req
331            .inputs
332            .iter()
333            .map(|batch| batch.batch.encoded_size_bytes())
334            .sum::<usize>();
335        let timeout = Duration::max(
336            // either our minimum timeout
337            COMPACTION_MINIMUM_TIMEOUT.get(&machine.applier.cfg),
338            // or 1s per MB of input data
339            Duration::from_secs(u64::cast_from(total_input_bytes / MiB)),
340        );
341        // always use most recent schema from all the Runs we're compacting to prevent Compactors
342        // created before the schema was evolved, from trying to "de-evolve" a Part.
343        let Some(compaction_schema_id) = req
344            .inputs
345            .iter()
346            .flat_map(|batch| batch.batch.run_meta.iter())
347            .filter_map(|run_meta| run_meta.schema)
348            // It's an invariant that SchemaIds are ordered.
349            .max()
350        else {
351            metrics.compaction.schema_selection.no_schema.inc();
352            metrics.compaction.failed.inc();
353            return Err(anyhow!(
354                "compacting {shard_id} and spine ids {spine_ids}: could not determine schema id from inputs",
355                shard_id = req.shard_id,
356                spine_ids = mz_ore::str::separated(", ", req.inputs.iter().map(|i| i.id))
357            ));
358        };
359        let Some((key_schema, val_schema)) = machine.get_schema(compaction_schema_id) else {
360            metrics.compaction.schema_selection.no_schema.inc();
361            metrics.compaction.failed.inc();
362            return Err(anyhow!(
363                "compacting {shard_id} and spine ids {spine_ids}: schema id {compaction_schema_id} not present in machine state",
364                shard_id = req.shard_id,
365                spine_ids = mz_ore::str::separated(", ", req.inputs.iter().map(|i| i.id))
366            ));
367        };
368
369        metrics.compaction.schema_selection.recent_schema.inc();
370
371        let compaction_schema = Schemas {
372            id: Some(compaction_schema_id),
373            key: Arc::new(key_schema),
374            val: Arc::new(val_schema),
375        };
376
377        trace!(
378            "compaction request for {}MBs ({} bytes), with timeout of {}s, and schema {:?}.",
379            total_input_bytes / MiB,
380            total_input_bytes,
381            timeout.as_secs_f64(),
382            compaction_schema.id,
383        );
384
385        let isolated_runtime = Arc::clone(&machine.isolated_runtime);
386        let machine_clone = machine.clone();
387        let metrics_clone = Arc::clone(&machine.applier.metrics);
388        let compact_span = debug_span!("compact::consolidate");
389        let res = tokio::time::timeout(
390            timeout,
391            // Compaction is cpu intensive, so be polite and spawn it on the isolated runtime.
392            isolated_runtime.spawn_named(
393                || "persist::compact::consolidate",
394                async move {
395                    // If the batches we are compacting are written with old versions of persist,
396                    // we may not have run UUIDs for them, meaning we don't have enough info to
397                    // safely compact them incrementally.
398                    let all_runs_have_uuids = req
399                        .inputs
400                        .iter()
401                        .all(|x| x.batch.runs().all(|(meta, _)| meta.id.is_some()));
402                    let all_runs_have_len = req
403                        .inputs
404                        .iter()
405                        .all(|x| x.batch.runs().all(|(meta, _)| meta.len.is_some()));
406
407                    let compact_cfg =
408                        CompactConfig::new(&machine_clone.applier.cfg, machine_clone.shard_id());
409                    let incremental_enabled = compact_cfg.batch.enable_incremental_compaction
410                        && all_runs_have_uuids
411                        && all_runs_have_len;
412                    let stream = Self::compact_stream(
413                        compact_cfg,
414                        Arc::clone(&machine_clone.applier.state_versions.blob),
415                        Arc::clone(&metrics_clone),
416                        Arc::clone(&machine_clone.applier.shard_metrics),
417                        Arc::clone(&machine_clone.isolated_runtime),
418                        req.clone(),
419                        compaction_schema,
420                        incremental_enabled,
421                    );
422
423                    let maintenance = if incremental_enabled {
424                        let mut maintenance = RoutineMaintenance::default();
425                        pin_mut!(stream);
426                        while let Some(res) = stream.next().await {
427                            let res = res?;
428                            let new_maintenance =
429                                Self::apply(res, &metrics_clone, &machine_clone).await?;
430                            maintenance.merge(new_maintenance);
431                        }
432                        maintenance
433                    } else {
434                        let res = Self::compact_all(stream, req.clone()).await?;
435                        Self::apply(
436                            FueledMergeRes {
437                                output: res.output,
438                                input: CompactionInput::Legacy,
439                                new_active_compaction: None,
440                            },
441                            &metrics_clone,
442                            &machine_clone,
443                        )
444                        .await?
445                    };
446
447                    Ok::<_, anyhow::Error>(maintenance)
448                }
449                .instrument(compact_span),
450            ),
451        )
452        .await;
453
454        metrics
455            .compaction
456            .seconds
457            .inc_by(start.elapsed().as_secs_f64());
458        let res = res.map_err(|e| {
459            metrics.compaction.timed_out.inc();
460            anyhow!(
461                "compaction timed out after {}s: {}",
462                timeout.as_secs_f64(),
463                e
464            )
465        })?;
466
467        match res {
468            Ok(maintenance) => Ok(maintenance),
469            Err(err) => {
470                metrics.compaction.failed.inc();
471                debug!(
472                    "compaction for {} failed: {}",
473                    machine.shard_id(),
474                    err.display_with_causes()
475                );
476                Err(err)
477            }
478        }
479    }
480
481    pub async fn compact_all(
482        stream: impl Stream<Item = Result<FueledMergeRes<T>, anyhow::Error>>,
483        req: CompactReq<T>,
484    ) -> Result<CompactRes<T>, anyhow::Error> {
485        pin_mut!(stream);
486
487        let mut all_parts = vec![];
488        let mut all_run_splits = vec![];
489        let mut all_run_meta = vec![];
490        let mut len = 0;
491
492        while let Some(res) = stream.next().await {
493            let res = res?.output;
494            let (parts, updates, run_meta, run_splits) =
495                (res.parts, res.len, res.run_meta, res.run_splits);
496
497            if updates == 0 {
498                continue;
499            }
500
501            let run_offset = all_parts.len();
502            if !all_parts.is_empty() {
503                all_run_splits.push(run_offset);
504            }
505            all_run_splits.extend(run_splits.iter().map(|r| r + run_offset));
506            all_run_meta.extend(run_meta);
507            all_parts.extend(parts);
508            len += updates;
509        }
510
511        let batches = req.inputs.iter().map(|x| x.id).collect::<BTreeSet<_>>();
512        let input = input_id_range(batches);
513
514        Ok(CompactRes {
515            output: HollowBatch::new(
516                req.desc.clone(),
517                all_parts,
518                len,
519                all_run_meta,
520                all_run_splits,
521            ),
522            input,
523        })
524    }
525
526    pub async fn apply(
527        res: FueledMergeRes<T>,
528        metrics: &Metrics,
529        machine: &Machine<K, V, T, D>,
530    ) -> Result<RoutineMaintenance, anyhow::Error> {
531        let (apply_merge_result, maintenance) = machine.merge_res(&res).await;
532
533        match &apply_merge_result {
534            ApplyMergeResult::AppliedExact => {
535                metrics.compaction.applied.inc();
536                metrics.compaction.applied_exact_match.inc();
537                machine.applier.shard_metrics.compaction_applied.inc();
538            }
539            ApplyMergeResult::AppliedSubset => {
540                metrics.compaction.applied.inc();
541                metrics.compaction.applied_subset_match.inc();
542                machine.applier.shard_metrics.compaction_applied.inc();
543            }
544            ApplyMergeResult::NotAppliedNoMatch
545            | ApplyMergeResult::NotAppliedInvalidSince
546            | ApplyMergeResult::NotAppliedTooManyUpdates => {
547                if let ApplyMergeResult::NotAppliedTooManyUpdates = &apply_merge_result {
548                    metrics.compaction.not_applied_too_many_updates.inc();
549                }
550                metrics.compaction.noop.inc();
551                let mut part_deletes = PartDeletes::default();
552                for part in &res.output.parts {
553                    part_deletes.add(part);
554                }
555                part_deletes
556                    .delete(
557                        machine.applier.state_versions.blob.as_ref(),
558                        machine.shard_id(),
559                        GC_BLOB_DELETE_CONCURRENCY_LIMIT.get(&machine.applier.cfg),
560                        &*metrics,
561                        &metrics.retries.external.compaction_noop_delete,
562                    )
563                    .await;
564            }
565        };
566
567        Ok(maintenance)
568    }
569
570    /// Compacts input batches in bounded memory.
571    ///
572    /// The memory bound is broken into pieces:
573    ///     1. in-progress work
574    ///     2. fetching parts from runs
575    ///     3. additional in-flight requests to Blob
576    ///
577    /// 1. In-progress work is bounded by 2 * [BatchBuilderConfig::blob_target_size]. This
578    ///    usage is met at two mutually exclusive moments:
579    ///   * When reading in a part, we hold the columnar format in memory while writing its
580    ///     contents into a heap.
581    ///   * When writing a part, we hold a temporary updates buffer while encoding/writing
582    ///     it into a columnar format for Blob.
583    ///
584    /// 2. When compacting runs, only 1 part from each one is held in memory at a time.
585    ///    Compaction will determine an appropriate number of runs to compact together
586    ///    given the memory bound and accounting for the reservation in (1). A minimum
587    ///    of 2 * [BatchBuilderConfig::blob_target_size] of memory is expected, to be
588    ///    able to at least have the capacity to compact two runs together at a time,
589    ///    and more runs will be compacted together if more memory is available.
590    ///
591    /// 3. If there is excess memory after accounting for (1) and (2), we increase the
592    ///    number of outstanding parts we can keep in-flight to Blob.
593    pub fn compact_stream(
594        cfg: CompactConfig,
595        blob: Arc<dyn Blob>,
596        metrics: Arc<Metrics>,
597        shard_metrics: Arc<ShardMetrics>,
598        isolated_runtime: Arc<IsolatedRuntime>,
599        req: CompactReq<T>,
600        write_schemas: Schemas<K, V>,
601        incremental_enabled: bool,
602    ) -> impl Stream<Item = Result<FueledMergeRes<T>, anyhow::Error>> {
603        async_stream::stream! {
604            let () = Self::validate_req(&req)?;
605
606            // We introduced a fast-path optimization in https://github.com/MaterializeInc/materialize/pull/15363
607            // but had to revert it due to a very scary bug. Here we count how many of our compaction reqs
608            // could be eligible for the optimization to better understand whether it's worth trying to
609            // reintroduce it.
610            let mut single_nonempty_batch = None;
611            for batch in &req.inputs {
612                if batch.batch.len > 0 {
613                    match single_nonempty_batch {
614                        None => single_nonempty_batch = Some(batch),
615                        Some(_previous_nonempty_batch) => {
616                            single_nonempty_batch = None;
617                            break;
618                        }
619                    }
620                }
621            }
622            if let Some(single_nonempty_batch) = single_nonempty_batch {
623                if single_nonempty_batch.batch.run_splits.len() == 0
624                    && single_nonempty_batch.batch.desc.since()
625                        != &Antichain::from_elem(T::minimum())
626                {
627                    metrics.compaction.fast_path_eligible.inc();
628                }
629            }
630
631            // Reserve space for the in-progress part to be held in-mem representation and columnar -
632            let in_progress_part_reserved_memory_bytes = 2 * cfg.batch.blob_target_size;
633            // - then remaining memory will go towards pulling down as many runs as we can.
634            // We'll always do at least two runs per chunk, which means we may go over this limit
635            // if parts are large or the limit is low... though we do at least increment a metric
636            // when that happens.
637            let run_reserved_memory_bytes = cfg
638                .compaction_memory_bound_bytes
639                .saturating_sub(in_progress_part_reserved_memory_bytes);
640
641            let chunked_runs = Self::chunk_runs(
642                &req,
643                &cfg,
644                &*metrics,
645                run_reserved_memory_bytes,
646                req.desc.since()
647            );
648            let total_chunked_runs = chunked_runs.len();
649
650            let parts_before = req.inputs.iter()
651                .map(|x| x.batch.parts.len()).sum::<usize>();
652            let parts_after = chunked_runs.iter()
653                .flat_map(|(_, _, runs, _)| {
654                    runs.iter().map(|(_, _, parts)| parts.len())
655                })
656                .sum::<usize>();
657            assert_eq!(
658                parts_before, parts_after,
659                "chunking should not change the number of parts",
660            );
661
662            for (applied, (input, desc, runs, run_chunk_max_memory_usage)) in
663                chunked_runs.into_iter().enumerate()
664            {
665                metrics.compaction.chunks_compacted.inc();
666                metrics
667                    .compaction
668                    .runs_compacted
669                    .inc_by(u64::cast_from(runs.len()));
670
671                // given the runs we actually have in our batch, we might have extra memory
672                // available. we reserved enough space to always have 1 in-progress part in
673                // flight, but if we have excess, we can use it to increase our write parallelism
674                let extra_outstanding_parts = (run_reserved_memory_bytes
675                    .saturating_sub(run_chunk_max_memory_usage))
676                    / cfg.batch.blob_target_size;
677                let mut run_cfg = cfg.clone();
678                run_cfg.batch.batch_builder_max_outstanding_parts = 1 + extra_outstanding_parts;
679
680                let desc = if incremental_enabled {
681                    desc
682                } else {
683                    req.desc.clone()
684                };
685
686                let runs = runs.iter()
687                    .map(|(desc, meta, run)| (*desc, *meta, *run))
688                    .collect::<Vec<_>>();
689
690                let batch = Self::compact_runs(
691                    &run_cfg,
692                    &req.shard_id,
693                    &desc,
694                    runs,
695                    Arc::clone(&blob),
696                    Arc::clone(&metrics),
697                    Arc::clone(&shard_metrics),
698                    Arc::clone(&isolated_runtime),
699                    write_schemas.clone(),
700                )
701                .await?;
702
703                assert!(
704                    (batch.len == 0 && batch.parts.len() == 0)
705                        || (batch.len > 0 && batch.parts.len() > 0),
706                    "updates={}, parts={}",
707                    batch.len,
708                    batch.parts.len(),
709                );
710
711                // Set up active compaction metadata
712                let clock = cfg.now.clone();
713                let active_compaction = if applied < total_chunked_runs - 1 {
714                    Some(ActiveCompaction { start_ms: clock() })
715                } else {
716                    None
717                };
718
719                let res = CompactRes {
720                    output: batch,
721                    input,
722                };
723
724                let res = FueledMergeRes {
725                    output: res.output,
726                    new_active_compaction: active_compaction,
727                    input: res.input,
728                };
729
730                yield Ok(res);
731            }
732        }
733    }
734
735    /// Compacts the input batches together, returning a single compacted batch.
736    /// Under the hood this just calls [Self::compact_stream] and
737    /// [Self::compact_all], but it is a convenience method that allows
738    /// the caller to not have to deal with the streaming API.
739    pub async fn compact(
740        cfg: CompactConfig,
741        blob: Arc<dyn Blob>,
742        metrics: Arc<Metrics>,
743        shard_metrics: Arc<ShardMetrics>,
744        isolated_runtime: Arc<IsolatedRuntime>,
745        req: CompactReq<T>,
746        write_schemas: Schemas<K, V>,
747    ) -> Result<CompactRes<T>, anyhow::Error> {
748        let stream = Self::compact_stream(
749            cfg,
750            Arc::clone(&blob),
751            Arc::clone(&metrics),
752            Arc::clone(&shard_metrics),
753            Arc::clone(&isolated_runtime),
754            req.clone(),
755            write_schemas,
756            false,
757        );
758
759        Self::compact_all(stream, req).await
760    }
761
762    /// Chunks runs with the following rules:
763    /// 1. Runs from multiple batches are allowed to be mixed as long as _every_ run in the
764    ///    batch is present in the chunk.
765    /// 2. Otherwise, runs are split into chunks of runs from a single batch.
766    fn chunk_runs<'a>(
767        req: &'a CompactReq<T>,
768        cfg: &CompactConfig,
769        metrics: &Metrics,
770        run_reserved_memory_bytes: usize,
771        since: &Antichain<T>,
772    ) -> Vec<(
773        CompactionInput,
774        Description<T>,
775        Vec<(&'a Description<T>, &'a RunMeta, &'a [RunPart<T>])>,
776        usize,
777    )> {
778        // Assert that all of the inputs are contiguous / can be compacted together.
779        let _ = input_id_range(req.inputs.iter().map(|x| x.id).collect());
780
781        // Iterate through batches by spine id.
782        let mut batches: Vec<_> = req.inputs.iter().map(|x| (x.id, &*x.batch)).collect();
783        batches.sort_by_key(|(id, _)| *id);
784
785        let mut chunks = vec![];
786        let mut current_chunk_ids = BTreeSet::new();
787        let mut current_chunk_descs = Vec::new();
788        let mut current_chunk_runs = vec![];
789        let mut current_chunk_max_memory_usage = 0;
790
791        fn max_part_bytes<T>(parts: &[RunPart<T>], cfg: &CompactConfig) -> usize {
792            parts
793                .iter()
794                .map(|p| p.max_part_bytes())
795                .max()
796                .unwrap_or(cfg.batch.blob_target_size)
797        }
798
799        fn desc_range<T: Timestamp>(
800            descs: impl IntoIterator<Item = Description<T>>,
801            since: Antichain<T>,
802        ) -> Description<T> {
803            let mut descs = descs.into_iter();
804            let first = descs.next().expect("non-empty set of descriptions");
805            let lower = first.lower().clone();
806            let mut upper = first.upper().clone();
807            for desc in descs {
808                assert_eq!(&upper, desc.lower());
809                upper = desc.upper().clone();
810            }
811            let upper = upper.clone();
812            Description::new(lower, upper, since)
813        }
814
815        for (spine_id, batch) in batches {
816            let batch_size = batch
817                .runs()
818                .map(|(_, parts)| max_part_bytes(parts, cfg))
819                .sum::<usize>();
820
821            let num_runs = batch.run_meta.len();
822
823            let runs = batch.runs().flat_map(|(meta, parts)| {
824                if meta.order.unwrap_or(RunOrder::Codec) == cfg.batch.preferred_order {
825                    Either::Left(std::iter::once((&batch.desc, meta, parts)))
826                } else {
827                    // The downstream consolidation step will handle a long run that's not in
828                    // the desired order by splitting it up into many single-element runs. This preserves
829                    // correctness, but it means that we may end up needing to iterate through
830                    // many more parts concurrently than expected, increasing memory use. Instead,
831                    // we break up those runs into individual batch parts, fetching hollow runs as
832                    // necessary, before they're grouped together to be passed to consolidation.
833                    // The downside is that this breaks the usual property that compaction produces
834                    // fewer runs than it takes in. This should generally be resolved by future
835                    // runs of compaction.
836                    soft_assert_or_log!(
837                        !parts.iter().any(|r| matches!(r, RunPart::Many(_))),
838                        "unexpected out-of-order hollow run"
839                    );
840                    Either::Right(
841                        parts
842                            .iter()
843                            .map(move |p| (&batch.desc, meta, std::slice::from_ref(p))),
844                    )
845                }
846            });
847
848            // Combine the given batch into the current chunk
849            // - if they fit within the memory budget,
850            // - if both have only at most a single run, so otherwise compaction wouldn't make progress.
851            if current_chunk_max_memory_usage + batch_size <= run_reserved_memory_bytes
852                || current_chunk_runs.len() + num_runs <= 2
853            {
854                if current_chunk_max_memory_usage + batch_size > run_reserved_memory_bytes {
855                    // We've chosen to merge these batches together despite being over budget,
856                    // which should be rare.
857                    metrics.compaction.memory_violations.inc();
858                }
859                current_chunk_ids.insert(spine_id);
860                current_chunk_descs.push(batch.desc.clone());
861                current_chunk_runs.extend(runs);
862                current_chunk_max_memory_usage += batch_size;
863                continue;
864            }
865
866            // Otherwise, we cannot mix this batch partially. Flush any existing mixed chunk first.
867            if !current_chunk_ids.is_empty() {
868                chunks.push((
869                    input_id_range(std::mem::take(&mut current_chunk_ids)),
870                    desc_range(mem::take(&mut current_chunk_descs), since.clone()),
871                    std::mem::take(&mut current_chunk_runs),
872                    current_chunk_max_memory_usage,
873                ));
874                current_chunk_max_memory_usage = 0;
875            }
876
877            // If the batch fits within limits, try and accumulate future batches into it.
878            if batch_size <= run_reserved_memory_bytes {
879                current_chunk_ids.insert(spine_id);
880                current_chunk_descs.push(batch.desc.clone());
881                current_chunk_runs.extend(runs);
882                current_chunk_max_memory_usage += batch_size;
883                continue;
884            }
885
886            // This batch is too large to compact with others, or even in a single go.
887            // Process this batch alone, splitting into single-batch chunks as needed.
888            let mut run_iter = runs.into_iter().peekable();
889            debug_assert!(current_chunk_ids.is_empty());
890            debug_assert!(current_chunk_descs.is_empty());
891            debug_assert!(current_chunk_runs.is_empty());
892            debug_assert_eq!(current_chunk_max_memory_usage, 0);
893            let mut current_chunk_run_ids = BTreeSet::new();
894
895            while let Some((desc, meta, parts)) = run_iter.next() {
896                let run_size = max_part_bytes(parts, cfg);
897                current_chunk_runs.push((desc, meta, parts));
898                current_chunk_max_memory_usage += run_size;
899                current_chunk_run_ids.extend(meta.id);
900
901                if let Some((_, _meta, next_parts)) = run_iter.peek() {
902                    let next_size = max_part_bytes(next_parts, cfg);
903                    if current_chunk_max_memory_usage + next_size > run_reserved_memory_bytes {
904                        // If the current chunk only has one run, record a memory violation metric.
905                        if current_chunk_runs.len() == 1 {
906                            metrics.compaction.memory_violations.inc();
907                            continue;
908                        }
909                        // Flush the current chunk and start a new one.
910                        chunks.push((
911                            CompactionInput::PartialBatch(
912                                spine_id,
913                                mem::take(&mut current_chunk_run_ids),
914                            ),
915                            desc_range([batch.desc.clone()], since.clone()),
916                            std::mem::take(&mut current_chunk_runs),
917                            current_chunk_max_memory_usage,
918                        ));
919                        current_chunk_max_memory_usage = 0;
920                    }
921                }
922            }
923
924            if !current_chunk_runs.is_empty() {
925                chunks.push((
926                    CompactionInput::PartialBatch(spine_id, mem::take(&mut current_chunk_run_ids)),
927                    desc_range([batch.desc.clone()], since.clone()),
928                    std::mem::take(&mut current_chunk_runs),
929                    current_chunk_max_memory_usage,
930                ));
931                current_chunk_max_memory_usage = 0;
932            }
933        }
934
935        // If we ended with a mixed-batch chunk in progress, flush it.
936        if !current_chunk_ids.is_empty() {
937            chunks.push((
938                input_id_range(current_chunk_ids),
939                desc_range(current_chunk_descs, since.clone()),
940                current_chunk_runs,
941                current_chunk_max_memory_usage,
942            ));
943        }
944
945        chunks
946    }
947
948    /// Compacts runs together. If the input runs are sorted, a single run will be created as output.
949    ///
950    /// Maximum possible memory usage is `(# runs + 2) * [crate::PersistConfig::blob_target_size]`
951    pub(crate) async fn compact_runs(
952        cfg: &CompactConfig,
953        shard_id: &ShardId,
954        desc: &Description<T>,
955        runs: Vec<(&Description<T>, &RunMeta, &[RunPart<T>])>,
956        blob: Arc<dyn Blob>,
957        metrics: Arc<Metrics>,
958        shard_metrics: Arc<ShardMetrics>,
959        isolated_runtime: Arc<IsolatedRuntime>,
960        write_schemas: Schemas<K, V>,
961    ) -> Result<HollowBatch<T>, anyhow::Error> {
962        // TODO: Figure out a more principled way to allocate our memory budget.
963        // Currently, we give any excess budget to write parallelism. If we had
964        // to pick between 100% towards writes vs 100% towards reads, then reads
965        // is almost certainly better, but the ideal is probably somewhere in
966        // between the two.
967        //
968        // For now, invent some some extra budget out of thin air for prefetch.
969        let prefetch_budget_bytes = 2 * cfg.batch.blob_target_size;
970
971        let mut timings = Timings::default();
972
973        let mut batch_cfg = cfg.batch.clone();
974
975        // Use compaction as a method of getting inline writes out of state, to
976        // make room for more inline writes. We could instead do this at the end
977        // of compaction by flushing out the batch, but doing it here based on
978        // the config allows BatchBuilder to do its normal pipelining of writes.
979        batch_cfg.inline_writes_single_max_bytes = 0;
980
981        let parts = BatchParts::new_ordered::<D>(
982            batch_cfg,
983            cfg.batch.preferred_order,
984            Arc::clone(&metrics),
985            Arc::clone(&shard_metrics),
986            *shard_id,
987            Arc::clone(&blob),
988            Arc::clone(&isolated_runtime),
989            &metrics.compaction.batch,
990        );
991        let mut batch = BatchBuilderInternal::<K, V, T, D>::new(
992            cfg.batch.clone(),
993            parts,
994            Arc::clone(&metrics),
995            write_schemas.clone(),
996            Arc::clone(&blob),
997            shard_id.clone(),
998            cfg.version.clone(),
999        );
1000
1001        let mut consolidator = Consolidator::new(
1002            format!(
1003                "{}[lower={:?},upper={:?}]",
1004                shard_id,
1005                desc.lower().elements(),
1006                desc.upper().elements()
1007            ),
1008            cfg.fetch_config.clone(),
1009            *shard_id,
1010            StructuredSort::<K, V, T, D>::new(write_schemas.clone()),
1011            blob,
1012            Arc::clone(&metrics),
1013            shard_metrics,
1014            metrics.read.compaction.clone(),
1015            FetchBatchFilter::Compaction {
1016                since: desc.since().clone(),
1017            },
1018            None,
1019            prefetch_budget_bytes,
1020        );
1021
1022        for (desc, meta, parts) in runs {
1023            consolidator.enqueue_run(desc, meta, parts.iter().cloned());
1024        }
1025
1026        let remaining_budget = consolidator.start_prefetches();
1027        if remaining_budget.is_none() {
1028            metrics.compaction.not_all_prefetched.inc();
1029        }
1030
1031        loop {
1032            let mut chunks = vec![];
1033            let mut total_bytes = 0;
1034            // We attempt to pull chunks out of the consolidator that match our target size,
1035            // but it's possible that we may get smaller chunks... for example, if not all
1036            // parts have been fetched yet. Loop until we've got enough data to justify flushing
1037            // it out to blob (or we run out of data.)
1038            while total_bytes < cfg.batch.blob_target_size {
1039                let fetch_start = Instant::now();
1040                let Some(chunk) = consolidator
1041                    .next_chunk(
1042                        cfg.compaction_yield_after_n_updates,
1043                        cfg.batch.blob_target_size - total_bytes,
1044                    )
1045                    .await?
1046                else {
1047                    break;
1048                };
1049                timings.part_fetching += fetch_start.elapsed();
1050                total_bytes += chunk.goodbytes();
1051                chunks.push(chunk);
1052                tokio::task::yield_now().await;
1053            }
1054
1055            // In the hopefully-common case of a single chunk, this will not copy.
1056            let Some(updates) = Part::concat(&chunks).expect("compaction produces well-typed data")
1057            else {
1058                break;
1059            };
1060            batch.flush_part(desc.clone(), updates).await;
1061        }
1062        let mut batch = batch.finish(desc.clone()).await?;
1063
1064        // We use compaction as a method of getting inline writes out of state,
1065        // to make room for more inline writes. This happens in
1066        // `CompactConfig::new` by overriding the inline writes threshold
1067        // config. This is a bit action-at-a-distance, so defensively detect if
1068        // this breaks here and log and correct it if so.
1069        let has_inline_parts = batch.batch.parts.iter().any(|x| x.is_inline());
1070        if has_inline_parts {
1071            error!(%shard_id, ?cfg, "compaction result unexpectedly had inline writes");
1072            let () = batch
1073                .flush_to_blob(
1074                    &cfg.batch,
1075                    &metrics.compaction.batch,
1076                    &isolated_runtime,
1077                    &write_schemas,
1078                )
1079                .await;
1080        }
1081
1082        timings.record(&metrics);
1083        Ok(batch.into_hollow_batch())
1084    }
1085
1086    fn validate_req(req: &CompactReq<T>) -> Result<(), anyhow::Error> {
1087        let mut frontier = req.desc.lower();
1088        for input in req.inputs.iter() {
1089            if PartialOrder::less_than(req.desc.since(), input.batch.desc.since()) {
1090                return Err(anyhow!(
1091                    "output since {:?} must be at or in advance of input since {:?}",
1092                    req.desc.since(),
1093                    input.batch.desc.since()
1094                ));
1095            }
1096            if frontier != input.batch.desc.lower() {
1097                return Err(anyhow!(
1098                    "invalid merge of non-consecutive batches {:?} vs {:?}",
1099                    frontier,
1100                    input.batch.desc.lower()
1101                ));
1102            }
1103            frontier = input.batch.desc.upper();
1104        }
1105        if frontier != req.desc.upper() {
1106            return Err(anyhow!(
1107                "invalid merge of non-consecutive batches {:?} vs {:?}",
1108                frontier,
1109                req.desc.upper()
1110            ));
1111        }
1112        Ok(())
1113    }
1114}
1115
1116#[derive(Debug, Default)]
1117struct Timings {
1118    part_fetching: Duration,
1119    heap_population: Duration,
1120}
1121
1122impl Timings {
1123    fn record(self, metrics: &Metrics) {
1124        // intentionally deconstruct so we don't forget to consider each field
1125        let Timings {
1126            part_fetching,
1127            heap_population,
1128        } = self;
1129
1130        metrics
1131            .compaction
1132            .steps
1133            .part_fetch_seconds
1134            .inc_by(part_fetching.as_secs_f64());
1135        metrics
1136            .compaction
1137            .steps
1138            .heap_population_seconds
1139            .inc_by(heap_population.as_secs_f64());
1140    }
1141}
1142
1143#[cfg(test)]
1144mod tests {
1145    use mz_dyncfg::ConfigUpdates;
1146    use mz_ore::{assert_contains, assert_err};
1147    use mz_persist_types::codec_impls::StringSchema;
1148    use timely::progress::Antichain;
1149
1150    use crate::PersistLocation;
1151    use crate::batch::BLOB_TARGET_SIZE;
1152    use crate::internal::trace::SpineId;
1153    use crate::tests::{all_ok, expect_fetch_part, new_test_client_cache};
1154
1155    use super::*;
1156
1157    // A regression test for a bug caught during development of materialize#13160 (never
1158    // made it to main) where batches written by compaction would always have a
1159    // since of the minimum timestamp.
1160    #[mz_persist_proc::test(tokio::test)]
1161    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1162    async fn regression_minimum_since(dyncfgs: ConfigUpdates) {
1163        let data = vec![
1164            (("0".to_owned(), "zero".to_owned()), 0, 1),
1165            (("0".to_owned(), "zero".to_owned()), 1, -1),
1166            (("1".to_owned(), "one".to_owned()), 1, 1),
1167        ];
1168
1169        let cache = new_test_client_cache(&dyncfgs);
1170        cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1171        let (mut write, _) = cache
1172            .open(PersistLocation::new_in_mem())
1173            .await
1174            .expect("client construction failed")
1175            .expect_open::<String, String, u64, i64>(ShardId::new())
1176            .await;
1177        let b0 = write
1178            .expect_batch(&data[..1], 0, 1)
1179            .await
1180            .into_hollow_batch();
1181        let b1 = write
1182            .expect_batch(&data[1..], 1, 2)
1183            .await
1184            .into_hollow_batch();
1185
1186        let req = CompactReq {
1187            shard_id: write.machine.shard_id(),
1188            desc: Description::new(
1189                b0.desc.lower().clone(),
1190                b1.desc.upper().clone(),
1191                Antichain::from_elem(10u64),
1192            ),
1193            inputs: vec![
1194                IdHollowBatch {
1195                    batch: Arc::new(b0),
1196                    id: SpineId(0, 1),
1197                },
1198                IdHollowBatch {
1199                    batch: Arc::new(b1),
1200                    id: SpineId(1, 2),
1201                },
1202            ],
1203        };
1204        let schemas = Schemas {
1205            id: None,
1206            key: Arc::new(StringSchema),
1207            val: Arc::new(StringSchema),
1208        };
1209        let res = Compactor::<String, String, u64, i64>::compact(
1210            CompactConfig::new(&write.cfg, write.shard_id()),
1211            Arc::clone(&write.blob),
1212            Arc::clone(&write.metrics),
1213            write.metrics.shards.shard(&write.machine.shard_id(), ""),
1214            Arc::new(IsolatedRuntime::new_for_tests()),
1215            req.clone(),
1216            schemas.clone(),
1217        )
1218        .await
1219        .expect("compaction failed");
1220
1221        assert_eq!(res.output.desc, req.desc);
1222        assert_eq!(res.output.len, 1);
1223        assert_eq!(res.output.part_count(), 1);
1224        let part = res.output.parts[0].expect_hollow_part();
1225        let (part, updates) = expect_fetch_part(
1226            write.blob.as_ref(),
1227            &part.key.complete(&write.machine.shard_id()),
1228            &write.metrics,
1229            &schemas,
1230        )
1231        .await;
1232        assert_eq!(part.desc, res.output.desc);
1233        assert_eq!(updates, all_ok(&data, 10));
1234    }
1235
1236    #[mz_persist_proc::test(tokio::test)]
1237    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1238    async fn disable_compaction(dyncfgs: ConfigUpdates) {
1239        let data = [
1240            (("0".to_owned(), "zero".to_owned()), 0, 1),
1241            (("0".to_owned(), "zero".to_owned()), 1, -1),
1242            (("1".to_owned(), "one".to_owned()), 1, 1),
1243        ];
1244
1245        let cache = new_test_client_cache(&dyncfgs);
1246        cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1247        let (mut write, _) = cache
1248            .open(PersistLocation::new_in_mem())
1249            .await
1250            .expect("client construction failed")
1251            .expect_open::<String, String, u64, i64>(ShardId::new())
1252            .await;
1253        let b0 = write
1254            .expect_batch(&data[..1], 0, 1)
1255            .await
1256            .into_hollow_batch();
1257        let b1 = write
1258            .expect_batch(&data[1..], 1, 2)
1259            .await
1260            .into_hollow_batch();
1261
1262        let req = CompactReq {
1263            shard_id: write.machine.shard_id(),
1264            desc: Description::new(
1265                b0.desc.lower().clone(),
1266                b1.desc.upper().clone(),
1267                Antichain::from_elem(10u64),
1268            ),
1269            inputs: vec![
1270                IdHollowBatch {
1271                    batch: Arc::new(b0),
1272                    id: SpineId(0, 1),
1273                },
1274                IdHollowBatch {
1275                    batch: Arc::new(b1),
1276                    id: SpineId(1, 2),
1277                },
1278            ],
1279        };
1280        write.cfg.set_config(&COMPACTION_HEURISTIC_MIN_INPUTS, 1);
1281        let compactor = write.compact.as_ref().expect("compaction hard disabled");
1282
1283        write.cfg.disable_compaction();
1284        let result = compactor
1285            .compact_and_apply_background(req.clone(), &write.machine)
1286            .expect("listener")
1287            .await
1288            .expect("channel closed");
1289        assert_err!(result);
1290        assert_contains!(result.unwrap_err().to_string(), "compaction disabled");
1291
1292        write.cfg.enable_compaction();
1293        compactor
1294            .compact_and_apply_background(req, &write.machine)
1295            .expect("listener")
1296            .await
1297            .expect("channel closed")
1298            .expect("compaction success");
1299
1300        // Make sure our CYA dyncfg works.
1301        let data2 = [
1302            (("2".to_owned(), "two".to_owned()), 2, 1),
1303            (("2".to_owned(), "two".to_owned()), 3, -1),
1304            (("3".to_owned(), "three".to_owned()), 3, 1),
1305        ];
1306
1307        let b2 = write
1308            .expect_batch(&data2[..1], 2, 3)
1309            .await
1310            .into_hollow_batch();
1311        let b3 = write
1312            .expect_batch(&data2[1..], 3, 4)
1313            .await
1314            .into_hollow_batch();
1315
1316        let req = CompactReq {
1317            shard_id: write.machine.shard_id(),
1318            desc: Description::new(
1319                b2.desc.lower().clone(),
1320                b3.desc.upper().clone(),
1321                Antichain::from_elem(20u64),
1322            ),
1323            inputs: vec![
1324                IdHollowBatch {
1325                    batch: Arc::new(b2),
1326                    id: SpineId(0, 1),
1327                },
1328                IdHollowBatch {
1329                    batch: Arc::new(b3),
1330                    id: SpineId(1, 2),
1331                },
1332            ],
1333        };
1334        let compactor = write.compact.as_ref().expect("compaction hard disabled");
1335
1336        // When the dyncfg is set to false we should ignore the process flag.
1337        write.cfg.set_config(&COMPACTION_CHECK_PROCESS_FLAG, false);
1338        write.cfg.disable_compaction();
1339        // Compaction still succeeded!
1340        compactor
1341            .compact_and_apply_background(req, &write.machine)
1342            .expect("listener")
1343            .await
1344            .expect("channel closed")
1345            .expect("compaction success");
1346    }
1347}