mz_persist_client/internal/
compact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11use std::fmt::Debug;
12use std::marker::PhantomData;
13use std::mem;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use differential_dataflow::difference::Monoid;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::Description;
21use futures::{Stream, pin_mut};
22use futures_util::StreamExt;
23use itertools::Either;
24use mz_dyncfg::Config;
25use mz_ore::cast::CastFrom;
26use mz_ore::error::ErrorExt;
27use mz_ore::now::NowFn;
28use mz_ore::soft_assert_or_log;
29use mz_persist::location::Blob;
30use mz_persist_types::part::Part;
31use mz_persist_types::{Codec, Codec64};
32use timely::PartialOrder;
33use timely::progress::{Antichain, Timestamp};
34use tokio::sync::mpsc::Sender;
35use tokio::sync::{TryAcquireError, mpsc, oneshot};
36use tracing::{Instrument, Span, debug, debug_span, error, trace, warn};
37
38use crate::async_runtime::IsolatedRuntime;
39use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, BatchParts, PartDeletes};
40use crate::cfg::{
41    COMPACTION_HEURISTIC_MIN_INPUTS, COMPACTION_HEURISTIC_MIN_PARTS,
42    COMPACTION_HEURISTIC_MIN_UPDATES, COMPACTION_MEMORY_BOUND_BYTES,
43    GC_BLOB_DELETE_CONCURRENCY_LIMIT, MiB,
44};
45use crate::fetch::{FetchBatchFilter, FetchConfig};
46use crate::internal::encoding::Schemas;
47use crate::internal::gc::GarbageCollector;
48use crate::internal::machine::Machine;
49use crate::internal::maintenance::RoutineMaintenance;
50use crate::internal::metrics::ShardMetrics;
51use crate::internal::state::{HollowBatch, RunMeta, RunOrder, RunPart};
52use crate::internal::trace::{
53    ActiveCompaction, ApplyMergeResult, CompactionInput, FueledMergeRes, IdHollowBatch, SpineId,
54    id_range,
55};
56use crate::iter::{Consolidator, StructuredSort};
57use crate::{Metrics, PersistConfig, ShardId};
58
59/// A request for compaction.
60///
61/// This is similar to FueledMergeReq, but intentionally a different type. If we
62/// move compaction to an rpc server, this one will become a protobuf; the type
63/// parameters will become names of codecs to look up in some registry.
64#[derive(Debug, Clone)]
65pub struct CompactReq<T> {
66    /// The shard the input and output batches belong to.
67    pub shard_id: ShardId,
68    /// A description for the output batch.
69    pub desc: Description<T>,
70    /// The updates to include in the output batch. Any data in these outside of
71    /// the output descriptions bounds should be ignored.
72    pub inputs: Vec<IdHollowBatch<T>>,
73}
74
75/// A response from compaction.
76#[derive(Debug)]
77pub struct CompactRes<T> {
78    /// The compacted batch.
79    pub output: HollowBatch<T>,
80    /// The runs that were compacted together to produce the output batch.
81    pub input: CompactionInput,
82}
83
84/// A snapshot of dynamic configs to make it easier to reason about an
85/// individual run of compaction.
86#[derive(Debug, Clone)]
87pub struct CompactConfig {
88    pub(crate) compaction_memory_bound_bytes: usize,
89    pub(crate) compaction_yield_after_n_updates: usize,
90    pub(crate) version: semver::Version,
91    pub(crate) batch: BatchBuilderConfig,
92    pub(crate) fetch_config: FetchConfig,
93    pub(crate) now: NowFn,
94}
95
96impl CompactConfig {
97    /// Initialize the compaction config from Persist configuration.
98    pub fn new(value: &PersistConfig, shard_id: ShardId) -> Self {
99        CompactConfig {
100            compaction_memory_bound_bytes: COMPACTION_MEMORY_BOUND_BYTES.get(value),
101            compaction_yield_after_n_updates: value.compaction_yield_after_n_updates,
102            version: value.build_version.clone(),
103            batch: BatchBuilderConfig::new(value, shard_id),
104            fetch_config: FetchConfig::from_persist_config(value),
105            now: value.now.clone(),
106        }
107    }
108}
109
110/// A service for performing physical and logical compaction.
111///
112/// This will possibly be called over RPC in the future. Physical compaction is
113/// merging adjacent batches. Logical compaction is advancing timestamps to a
114/// new since and consolidating the resulting updates.
115#[derive(Debug)]
116pub struct Compactor<K, V, T, D> {
117    cfg: PersistConfig,
118    metrics: Arc<Metrics>,
119    sender: Sender<(
120        Instant,
121        CompactReq<T>,
122        Machine<K, V, T, D>,
123        oneshot::Sender<Result<(), anyhow::Error>>,
124    )>,
125    _phantom: PhantomData<fn() -> D>,
126}
127
128impl<K, V, T, D> Clone for Compactor<K, V, T, D> {
129    fn clone(&self) -> Self {
130        Compactor {
131            cfg: self.cfg.clone(),
132            metrics: Arc::clone(&self.metrics),
133            sender: self.sender.clone(),
134            _phantom: Default::default(),
135        }
136    }
137}
138
139/// In Compactor::compact_and_apply_background, the minimum amount of time to
140/// allow a compaction request to run before timing it out. A request may be
141/// given a timeout greater than this value depending on the inputs' size
142pub(crate) const COMPACTION_MINIMUM_TIMEOUT: Config<Duration> = Config::new(
143    "persist_compaction_minimum_timeout",
144    Duration::from_secs(90),
145    "\
146    The minimum amount of time to allow a persist compaction request to run \
147    before timing it out (Materialize).",
148);
149
150pub(crate) const COMPACTION_CHECK_PROCESS_FLAG: Config<bool> = Config::new(
151    "persist_compaction_check_process_flag",
152    true,
153    "Whether Compactor will obey the process_requests flag in PersistConfig, \
154        which allows dynamically disabling compaction. If false, all compaction requests will be processed.",
155);
156
157/// Create a `[CompactionInput::IdRange]` from a set of `SpineId`s.
158fn input_id_range(ids: BTreeSet<SpineId>) -> CompactionInput {
159    let id = id_range(ids);
160
161    CompactionInput::IdRange(id)
162}
163
164impl<K, V, T, D> Compactor<K, V, T, D>
165where
166    K: Debug + Codec,
167    V: Debug + Codec,
168    T: Timestamp + Lattice + Codec64 + Sync,
169    D: Monoid + Ord + Codec64 + Send + Sync,
170{
171    pub fn new(
172        cfg: PersistConfig,
173        metrics: Arc<Metrics>,
174        gc: GarbageCollector<K, V, T, D>,
175    ) -> Self {
176        let (compact_req_sender, mut compact_req_receiver) = mpsc::channel::<(
177            Instant,
178            CompactReq<T>,
179            Machine<K, V, T, D>,
180            oneshot::Sender<Result<(), anyhow::Error>>,
181        )>(cfg.compaction_queue_size);
182        let concurrency_limit = Arc::new(tokio::sync::Semaphore::new(
183            cfg.compaction_concurrency_limit,
184        ));
185        let check_process_requests = COMPACTION_CHECK_PROCESS_FLAG.handle(&cfg.configs);
186        let process_requests = Arc::clone(&cfg.compaction_process_requests);
187
188        // spin off a single task responsible for executing compaction requests.
189        // work is enqueued into the task through a channel
190        let _worker_handle = mz_ore::task::spawn(|| "PersistCompactionScheduler", async move {
191            while let Some((enqueued, req, machine, completer)) = compact_req_receiver.recv().await
192            {
193                assert_eq!(req.shard_id, machine.shard_id());
194                let metrics = Arc::clone(&machine.applier.metrics);
195
196                // Only allow skipping compaction requests if the dyncfg is enabled.
197                if check_process_requests.get()
198                    && !process_requests.load(std::sync::atomic::Ordering::Relaxed)
199                {
200                    // Respond to the requester, track in our metrics, and log
201                    // that compaction is disabled.
202                    let _ = completer.send(Err(anyhow::anyhow!("compaction disabled")));
203                    metrics.compaction.disabled.inc();
204                    tracing::warn!(shard_id = ?req.shard_id, "Dropping compaction request on the floor.");
205
206                    continue;
207                }
208
209                let permit = {
210                    let inner = Arc::clone(&concurrency_limit);
211                    // perform a non-blocking attempt to acquire a permit so we can
212                    // record how often we're ever blocked on the concurrency limit
213                    match inner.try_acquire_owned() {
214                        Ok(permit) => permit,
215                        Err(TryAcquireError::NoPermits) => {
216                            metrics.compaction.concurrency_waits.inc();
217                            Arc::clone(&concurrency_limit)
218                                .acquire_owned()
219                                .await
220                                .expect("semaphore is never closed")
221                        }
222                        Err(TryAcquireError::Closed) => {
223                            // should never happen in practice. the semaphore is
224                            // never explicitly closed, nor will it close on Drop
225                            warn!("semaphore for shard {} is closed", machine.shard_id());
226                            continue;
227                        }
228                    }
229                };
230                metrics
231                    .compaction
232                    .queued_seconds
233                    .inc_by(enqueued.elapsed().as_secs_f64());
234
235                let compact_span =
236                    debug_span!(parent: None, "compact::apply", shard_id=%machine.shard_id());
237                compact_span.follows_from(&Span::current());
238                let gc = gc.clone();
239                mz_ore::task::spawn(|| "PersistCompactionWorker", async move {
240                    let res = Self::compact_and_apply(&machine, req)
241                        .instrument(compact_span)
242                        .await;
243
244                    match res {
245                        Ok(maintenance) => maintenance.start_performing(&machine, &gc),
246                        Err(err) => {
247                            debug!(shard_id =? machine.shard_id(), "compaction failed: {err:#}")
248                        }
249                    }
250
251                    // we can safely ignore errors here, it's possible the caller
252                    // wasn't interested in waiting and dropped their receiver
253                    let _ = completer.send(Ok(()));
254
255                    // moves `permit` into async scope so it can be dropped upon completion
256                    drop(permit);
257                });
258            }
259        });
260
261        Compactor {
262            cfg,
263            metrics,
264            sender: compact_req_sender,
265            _phantom: PhantomData,
266        }
267    }
268
269    /// Enqueues a [CompactReq] to be consumed by the compaction background task when available.
270    ///
271    /// Returns a receiver that indicates when compaction has completed. The receiver can be
272    /// safely dropped at any time if the caller does not wish to wait on completion.
273    pub fn compact_and_apply_background(
274        &self,
275        req: CompactReq<T>,
276        machine: &Machine<K, V, T, D>,
277    ) -> Option<oneshot::Receiver<Result<(), anyhow::Error>>> {
278        // Run some initial heuristics to ignore some requests for compaction.
279        // We don't gain much from e.g. compacting two very small batches that
280        // were just written, but it does result in non-trivial blob traffic
281        // (especially in aggregate). This heuristic is something we'll need to
282        // tune over time.
283        let should_compact = req.inputs.len() >= COMPACTION_HEURISTIC_MIN_INPUTS.get(&self.cfg)
284            || req
285                .inputs
286                .iter()
287                .map(|x| x.batch.part_count())
288                .sum::<usize>()
289                >= COMPACTION_HEURISTIC_MIN_PARTS.get(&self.cfg)
290            || req.inputs.iter().map(|x| x.batch.len).sum::<usize>()
291                >= COMPACTION_HEURISTIC_MIN_UPDATES.get(&self.cfg);
292        if !should_compact {
293            self.metrics.compaction.skipped.inc();
294            return None;
295        }
296
297        let (compaction_completed_sender, compaction_completed_receiver) = oneshot::channel();
298        let new_compaction_sender = self.sender.clone();
299
300        self.metrics.compaction.requested.inc();
301        // NB: we intentionally pass along the input machine, as it ought to come from the
302        // writer that generated the compaction request / maintenance. this machine has a
303        // spine structure that generated the request, so it has a much better chance of
304        // merging and committing the result than a machine kept up-to-date through state
305        // diffs, which may have a different spine structure less amenable to merging.
306        let send = new_compaction_sender.try_send((
307            Instant::now(),
308            req,
309            machine.clone(),
310            compaction_completed_sender,
311        ));
312        if let Err(_) = send {
313            self.metrics.compaction.dropped.inc();
314            return None;
315        }
316
317        Some(compaction_completed_receiver)
318    }
319
320    pub(crate) async fn compact_and_apply(
321        machine: &Machine<K, V, T, D>,
322        req: CompactReq<T>,
323    ) -> Result<RoutineMaintenance, anyhow::Error> {
324        let metrics = Arc::clone(&machine.applier.metrics);
325        metrics.compaction.started.inc();
326        let start = Instant::now();
327
328        // pick a timeout for our compaction request proportional to the amount
329        // of data that must be read (with a minimum set by PersistConfig)
330        let total_input_bytes = req
331            .inputs
332            .iter()
333            .map(|batch| batch.batch.encoded_size_bytes())
334            .sum::<usize>();
335        let timeout = Duration::max(
336            // either our minimum timeout
337            COMPACTION_MINIMUM_TIMEOUT.get(&machine.applier.cfg),
338            // or 1s per MB of input data
339            Duration::from_secs(u64::cast_from(total_input_bytes / MiB)),
340        );
341        // always use most recent schema from all the Runs we're compacting to prevent Compactors
342        // created before the schema was evolved, from trying to "de-evolve" a Part.
343        let Some(compaction_schema_id) = req
344            .inputs
345            .iter()
346            .flat_map(|batch| batch.batch.run_meta.iter())
347            .filter_map(|run_meta| run_meta.schema)
348            // It's an invariant that SchemaIds are ordered.
349            .max()
350        else {
351            metrics.compaction.schema_selection.no_schema.inc();
352            metrics.compaction.failed.inc();
353            return Err(anyhow!(
354                "compacting {shard_id} and spine ids {spine_ids}: could not determine schema id from inputs",
355                shard_id = req.shard_id,
356                spine_ids = mz_ore::str::separated(", ", req.inputs.iter().map(|i| i.id))
357            ));
358        };
359        let Some((key_schema, val_schema)) = machine.get_schema(compaction_schema_id) else {
360            metrics.compaction.schema_selection.no_schema.inc();
361            metrics.compaction.failed.inc();
362            return Err(anyhow!(
363                "compacting {shard_id} and spine ids {spine_ids}: schema id {compaction_schema_id} not present in machine state",
364                shard_id = req.shard_id,
365                spine_ids = mz_ore::str::separated(", ", req.inputs.iter().map(|i| i.id))
366            ));
367        };
368
369        metrics.compaction.schema_selection.recent_schema.inc();
370
371        let compaction_schema = Schemas {
372            id: Some(compaction_schema_id),
373            key: Arc::new(key_schema),
374            val: Arc::new(val_schema),
375        };
376
377        trace!(
378            "compaction request for {}MBs ({} bytes), with timeout of {}s, and schema {:?}.",
379            total_input_bytes / MiB,
380            total_input_bytes,
381            timeout.as_secs_f64(),
382            compaction_schema.id,
383        );
384
385        let isolated_runtime = Arc::clone(&machine.isolated_runtime);
386        let machine_clone = machine.clone();
387        let metrics_clone = Arc::clone(&machine.applier.metrics);
388        let compact_span = debug_span!("compact::consolidate");
389        let res = tokio::time::timeout(
390            timeout,
391            // Compaction is cpu intensive, so be polite and spawn it on the isolated runtime.
392            isolated_runtime.spawn_named(
393                || "persist::compact::consolidate",
394                async move {
395                    // If the batches we are compacting are written with old versions of persist,
396                    // we may not have run UUIDs for them, meaning we don't have enough info to
397                    // safely compact them incrementally.
398                    let all_runs_have_uuids = req
399                        .inputs
400                        .iter()
401                        .all(|x| x.batch.runs().all(|(meta, _)| meta.id.is_some()));
402                    let all_runs_have_len = req
403                        .inputs
404                        .iter()
405                        .all(|x| x.batch.runs().all(|(meta, _)| meta.len.is_some()));
406
407                    let compact_cfg =
408                        CompactConfig::new(&machine_clone.applier.cfg, machine_clone.shard_id());
409                    let incremental_enabled = compact_cfg.batch.enable_incremental_compaction
410                        && all_runs_have_uuids
411                        && all_runs_have_len;
412                    let stream = Self::compact_stream(
413                        compact_cfg,
414                        Arc::clone(&machine_clone.applier.state_versions.blob),
415                        Arc::clone(&metrics_clone),
416                        Arc::clone(&machine_clone.applier.shard_metrics),
417                        Arc::clone(&machine_clone.isolated_runtime),
418                        req.clone(),
419                        compaction_schema,
420                        incremental_enabled,
421                    );
422
423                    let maintenance = if incremental_enabled {
424                        let mut maintenance = RoutineMaintenance::default();
425                        pin_mut!(stream);
426                        while let Some(res) = stream.next().await {
427                            let res = res?;
428                            let new_maintenance =
429                                Self::apply(res, &metrics_clone, &machine_clone).await?;
430                            maintenance.merge(new_maintenance);
431                        }
432                        maintenance
433                    } else {
434                        let res = Self::compact_all(stream, req.clone()).await?;
435                        Self::apply(
436                            FueledMergeRes {
437                                output: res.output,
438                                input: CompactionInput::Legacy,
439                                new_active_compaction: None,
440                            },
441                            &metrics_clone,
442                            &machine_clone,
443                        )
444                        .await?
445                    };
446
447                    Ok::<_, anyhow::Error>(maintenance)
448                }
449                .instrument(compact_span),
450            ),
451        )
452        .await;
453
454        metrics
455            .compaction
456            .seconds
457            .inc_by(start.elapsed().as_secs_f64());
458        let res = res.map_err(|e| {
459            metrics.compaction.timed_out.inc();
460            anyhow!(
461                "compaction timed out after {}s: {}",
462                timeout.as_secs_f64(),
463                e
464            )
465        })?;
466
467        match res {
468            Ok(maintenance) => Ok(maintenance),
469            Err(err) => {
470                metrics.compaction.failed.inc();
471                debug!(
472                    "compaction for {} failed: {}",
473                    machine.shard_id(),
474                    err.display_with_causes()
475                );
476                Err(err)
477            }
478        }
479    }
480
481    pub async fn compact_all(
482        stream: impl Stream<Item = Result<FueledMergeRes<T>, anyhow::Error>>,
483        req: CompactReq<T>,
484    ) -> Result<CompactRes<T>, anyhow::Error> {
485        pin_mut!(stream);
486
487        let mut all_parts = vec![];
488        let mut all_run_splits = vec![];
489        let mut all_run_meta = vec![];
490        let mut len = 0;
491
492        while let Some(res) = stream.next().await {
493            let res = res?.output;
494            let (parts, updates, run_meta, run_splits) =
495                (res.parts, res.len, res.run_meta, res.run_splits);
496
497            if updates == 0 {
498                continue;
499            }
500
501            let run_offset = all_parts.len();
502            if !all_parts.is_empty() {
503                all_run_splits.push(run_offset);
504            }
505            all_run_splits.extend(run_splits.iter().map(|r| r + run_offset));
506            all_run_meta.extend(run_meta);
507            all_parts.extend(parts);
508            len += updates;
509        }
510
511        let batches = req.inputs.iter().map(|x| x.id).collect::<BTreeSet<_>>();
512        let input = input_id_range(batches);
513
514        Ok(CompactRes {
515            output: HollowBatch::new(
516                req.desc.clone(),
517                all_parts,
518                len,
519                all_run_meta,
520                all_run_splits,
521            ),
522            input,
523        })
524    }
525
526    pub async fn apply(
527        res: FueledMergeRes<T>,
528        metrics: &Metrics,
529        machine: &Machine<K, V, T, D>,
530    ) -> Result<RoutineMaintenance, anyhow::Error> {
531        let (apply_merge_result, maintenance) = machine.merge_res(&res).await;
532
533        match &apply_merge_result {
534            ApplyMergeResult::AppliedExact => {
535                metrics.compaction.applied.inc();
536                metrics.compaction.applied_exact_match.inc();
537                machine.applier.shard_metrics.compaction_applied.inc();
538            }
539            ApplyMergeResult::AppliedSubset => {
540                metrics.compaction.applied.inc();
541                metrics.compaction.applied_subset_match.inc();
542                machine.applier.shard_metrics.compaction_applied.inc();
543            }
544            ApplyMergeResult::NotAppliedNoMatch
545            | ApplyMergeResult::NotAppliedInvalidSince
546            | ApplyMergeResult::NotAppliedTooManyUpdates => {
547                if let ApplyMergeResult::NotAppliedTooManyUpdates = &apply_merge_result {
548                    metrics.compaction.not_applied_too_many_updates.inc();
549                }
550                metrics.compaction.noop.inc();
551                let mut part_deletes = PartDeletes::default();
552                for part in &res.output.parts {
553                    part_deletes.add(part);
554                }
555                part_deletes
556                    .delete(
557                        machine.applier.state_versions.blob.as_ref(),
558                        machine.shard_id(),
559                        GC_BLOB_DELETE_CONCURRENCY_LIMIT.get(&machine.applier.cfg),
560                        &*metrics,
561                        &metrics.retries.external.compaction_noop_delete,
562                    )
563                    .await;
564            }
565        };
566
567        Ok(maintenance)
568    }
569
570    /// Compacts input batches in bounded memory.
571    ///
572    /// The memory bound is broken into pieces:
573    ///     1. in-progress work
574    ///     2. fetching parts from runs
575    ///     3. additional in-flight requests to Blob
576    ///
577    /// 1. In-progress work is bounded by 2 * [BatchBuilderConfig::blob_target_size]. This
578    ///    usage is met at two mutually exclusive moments:
579    ///   * When reading in a part, we hold the columnar format in memory while writing its
580    ///     contents into a heap.
581    ///   * When writing a part, we hold a temporary updates buffer while encoding/writing
582    ///     it into a columnar format for Blob.
583    ///
584    /// 2. When compacting runs, only 1 part from each one is held in memory at a time.
585    ///    Compaction will determine an appropriate number of runs to compact together
586    ///    given the memory bound and accounting for the reservation in (1). A minimum
587    ///    of 2 * [BatchBuilderConfig::blob_target_size] of memory is expected, to be
588    ///    able to at least have the capacity to compact two runs together at a time,
589    ///    and more runs will be compacted together if more memory is available.
590    ///
591    /// 3. If there is excess memory after accounting for (1) and (2), we increase the
592    ///    number of outstanding parts we can keep in-flight to Blob.
593    pub fn compact_stream(
594        cfg: CompactConfig,
595        blob: Arc<dyn Blob>,
596        metrics: Arc<Metrics>,
597        shard_metrics: Arc<ShardMetrics>,
598        isolated_runtime: Arc<IsolatedRuntime>,
599        req: CompactReq<T>,
600        write_schemas: Schemas<K, V>,
601        incremental_enabled: bool,
602    ) -> impl Stream<Item = Result<FueledMergeRes<T>, anyhow::Error>> {
603        async_stream::stream! {
604            let () = Self::validate_req(&req)?;
605
606            // We introduced a fast-path optimization in https://github.com/MaterializeInc/materialize/pull/15363
607            // but had to revert it due to a very scary bug. Here we count how many of our compaction reqs
608            // could be eligible for the optimization to better understand whether it's worth trying to
609            // reintroduce it.
610            let mut single_nonempty_batch = None;
611            for batch in &req.inputs {
612                if batch.batch.len > 0 {
613                    match single_nonempty_batch {
614                        None => single_nonempty_batch = Some(batch),
615                        Some(_previous_nonempty_batch) => {
616                            single_nonempty_batch = None;
617                            break;
618                        }
619                    }
620                }
621            }
622            if let Some(single_nonempty_batch) = single_nonempty_batch {
623                if single_nonempty_batch.batch.run_splits.len() == 0
624                    && single_nonempty_batch.batch.desc.since() != &Antichain::from_elem(T::minimum())
625                {
626                    metrics.compaction.fast_path_eligible.inc();
627                }
628            }
629
630            // Reserve space for the in-progress part to be held in-mem representation and columnar -
631            let in_progress_part_reserved_memory_bytes = 2 * cfg.batch.blob_target_size;
632            // - then remaining memory will go towards pulling down as many runs as we can.
633            // We'll always do at least two runs per chunk, which means we may go over this limit
634            // if parts are large or the limit is low... though we do at least increment a metric
635            // when that happens.
636            let run_reserved_memory_bytes = cfg
637                .compaction_memory_bound_bytes
638                .saturating_sub(in_progress_part_reserved_memory_bytes);
639
640            let chunked_runs = Self::chunk_runs(
641                &req,
642                &cfg,
643                &*metrics,
644                run_reserved_memory_bytes,
645                req.desc.since()
646            );
647            let total_chunked_runs = chunked_runs.len();
648
649            let parts_before = req.inputs.iter().map(|x| x.batch.parts.len()).sum::<usize>();
650            let parts_after = chunked_runs.iter().flat_map(|(_, _, runs, _)| runs.iter().map(|(_, _, parts)| parts.len())).sum::<usize>();
651            assert_eq!(parts_before, parts_after, "chunking should not change the number of parts");
652
653            for (applied, (input, desc, runs, run_chunk_max_memory_usage)) in
654                chunked_runs.into_iter().enumerate()
655            {
656                metrics.compaction.chunks_compacted.inc();
657                metrics
658                    .compaction
659                    .runs_compacted
660                    .inc_by(u64::cast_from(runs.len()));
661
662                // given the runs we actually have in our batch, we might have extra memory
663                // available. we reserved enough space to always have 1 in-progress part in
664                // flight, but if we have excess, we can use it to increase our write parallelism
665                let extra_outstanding_parts = (run_reserved_memory_bytes
666                    .saturating_sub(run_chunk_max_memory_usage))
667                    / cfg.batch.blob_target_size;
668                let mut run_cfg = cfg.clone();
669                run_cfg.batch.batch_builder_max_outstanding_parts = 1 + extra_outstanding_parts;
670
671                let desc = if incremental_enabled {
672                    desc
673                } else {
674                    req.desc.clone()
675                };
676
677                let runs = runs.iter()
678                    .map(|(desc, meta, run)| (*desc, *meta, *run))
679                    .collect::<Vec<_>>();
680
681                let batch = Self::compact_runs(
682                    &run_cfg,
683                    &req.shard_id,
684                    &desc,
685                    runs,
686                    Arc::clone(&blob),
687                    Arc::clone(&metrics),
688                    Arc::clone(&shard_metrics),
689                    Arc::clone(&isolated_runtime),
690                    write_schemas.clone(),
691                )
692                .await?;
693
694                assert!(
695                    (batch.len == 0 && batch.parts.len() == 0) || (batch.len > 0 && batch.parts.len() > 0),
696                    "updates={}, parts={}",
697                    batch.len,
698                    batch.parts.len(),
699                );
700
701                // Set up active compaction metadata
702                let clock = cfg.now.clone();
703                let active_compaction = if applied < total_chunked_runs - 1 {
704                    Some(ActiveCompaction { start_ms: clock() })
705                } else {
706                    None
707                };
708
709                let res = CompactRes {
710                    output: batch,
711                    input,
712                };
713
714                let res = FueledMergeRes {
715                    output: res.output,
716                    new_active_compaction: active_compaction,
717                    input: res.input,
718                };
719
720                yield Ok(res);
721            }
722        }
723    }
724
725    /// Compacts the input batches together, returning a single compacted batch.
726    /// Under the hood this just calls [Self::compact_stream] and
727    /// [Self::compact_all], but it is a convenience method that allows
728    /// the caller to not have to deal with the streaming API.
729    pub async fn compact(
730        cfg: CompactConfig,
731        blob: Arc<dyn Blob>,
732        metrics: Arc<Metrics>,
733        shard_metrics: Arc<ShardMetrics>,
734        isolated_runtime: Arc<IsolatedRuntime>,
735        req: CompactReq<T>,
736        write_schemas: Schemas<K, V>,
737    ) -> Result<CompactRes<T>, anyhow::Error> {
738        let stream = Self::compact_stream(
739            cfg,
740            Arc::clone(&blob),
741            Arc::clone(&metrics),
742            Arc::clone(&shard_metrics),
743            Arc::clone(&isolated_runtime),
744            req.clone(),
745            write_schemas,
746            false,
747        );
748
749        Self::compact_all(stream, req).await
750    }
751
752    /// Chunks runs with the following rules:
753    /// 1. Runs from multiple batches are allowed to be mixed as long as _every_ run in the
754    ///    batch is present in the chunk.
755    /// 2. Otherwise, runs are split into chunks of runs from a single batch.
756    fn chunk_runs<'a>(
757        req: &'a CompactReq<T>,
758        cfg: &CompactConfig,
759        metrics: &Metrics,
760        run_reserved_memory_bytes: usize,
761        since: &Antichain<T>,
762    ) -> Vec<(
763        CompactionInput,
764        Description<T>,
765        Vec<(&'a Description<T>, &'a RunMeta, &'a [RunPart<T>])>,
766        usize,
767    )> {
768        // Assert that all of the inputs are contiguous / can be compacted together.
769        let _ = input_id_range(req.inputs.iter().map(|x| x.id).collect());
770
771        // Iterate through batches by spine id.
772        let mut batches: Vec<_> = req.inputs.iter().map(|x| (x.id, &*x.batch)).collect();
773        batches.sort_by_key(|(id, _)| *id);
774
775        let mut chunks = vec![];
776        let mut current_chunk_ids = BTreeSet::new();
777        let mut current_chunk_descs = Vec::new();
778        let mut current_chunk_runs = vec![];
779        let mut current_chunk_max_memory_usage = 0;
780
781        fn max_part_bytes<T>(parts: &[RunPart<T>], cfg: &CompactConfig) -> usize {
782            parts
783                .iter()
784                .map(|p| p.max_part_bytes())
785                .max()
786                .unwrap_or(cfg.batch.blob_target_size)
787        }
788
789        fn desc_range<T: Timestamp>(
790            descs: impl IntoIterator<Item = Description<T>>,
791            since: Antichain<T>,
792        ) -> Description<T> {
793            let mut descs = descs.into_iter();
794            let first = descs.next().expect("non-empty set of descriptions");
795            let lower = first.lower().clone();
796            let mut upper = first.upper().clone();
797            for desc in descs {
798                assert_eq!(&upper, desc.lower());
799                upper = desc.upper().clone();
800            }
801            let upper = upper.clone();
802            Description::new(lower, upper, since)
803        }
804
805        for (spine_id, batch) in batches {
806            let batch_size = batch
807                .runs()
808                .map(|(_, parts)| max_part_bytes(parts, cfg))
809                .sum::<usize>();
810
811            let num_runs = batch.run_meta.len();
812
813            let runs = batch.runs().flat_map(|(meta, parts)| {
814                if meta.order.unwrap_or(RunOrder::Codec) == cfg.batch.preferred_order {
815                    Either::Left(std::iter::once((&batch.desc, meta, parts)))
816                } else {
817                    // The downstream consolidation step will handle a long run that's not in
818                    // the desired order by splitting it up into many single-element runs. This preserves
819                    // correctness, but it means that we may end up needing to iterate through
820                    // many more parts concurrently than expected, increasing memory use. Instead,
821                    // we break up those runs into individual batch parts, fetching hollow runs as
822                    // necessary, before they're grouped together to be passed to consolidation.
823                    // The downside is that this breaks the usual property that compaction produces
824                    // fewer runs than it takes in. This should generally be resolved by future
825                    // runs of compaction.
826                    soft_assert_or_log!(
827                        !parts.iter().any(|r| matches!(r, RunPart::Many(_))),
828                        "unexpected out-of-order hollow run"
829                    );
830                    Either::Right(
831                        parts
832                            .iter()
833                            .map(move |p| (&batch.desc, meta, std::slice::from_ref(p))),
834                    )
835                }
836            });
837
838            // Combine the given batch into the current chunk
839            // - if they fit within the memory budget,
840            // - if both have only at most a single run, so otherwise compaction wouldn't make progress.
841            if current_chunk_max_memory_usage + batch_size <= run_reserved_memory_bytes
842                || current_chunk_runs.len() + num_runs <= 2
843            {
844                if current_chunk_max_memory_usage + batch_size > run_reserved_memory_bytes {
845                    // We've chosen to merge these batches together despite being over budget,
846                    // which should be rare.
847                    metrics.compaction.memory_violations.inc();
848                }
849                current_chunk_ids.insert(spine_id);
850                current_chunk_descs.push(batch.desc.clone());
851                current_chunk_runs.extend(runs);
852                current_chunk_max_memory_usage += batch_size;
853                continue;
854            }
855
856            // Otherwise, we cannot mix this batch partially. Flush any existing mixed chunk first.
857            if !current_chunk_ids.is_empty() {
858                chunks.push((
859                    input_id_range(std::mem::take(&mut current_chunk_ids)),
860                    desc_range(mem::take(&mut current_chunk_descs), since.clone()),
861                    std::mem::take(&mut current_chunk_runs),
862                    current_chunk_max_memory_usage,
863                ));
864                current_chunk_max_memory_usage = 0;
865            }
866
867            // If the batch fits within limits, try and accumulate future batches into it.
868            if batch_size <= run_reserved_memory_bytes {
869                current_chunk_ids.insert(spine_id);
870                current_chunk_descs.push(batch.desc.clone());
871                current_chunk_runs.extend(runs);
872                current_chunk_max_memory_usage += batch_size;
873                continue;
874            }
875
876            // This batch is too large to compact with others, or even in a single go.
877            // Process this batch alone, splitting into single-batch chunks as needed.
878            let mut run_iter = runs.into_iter().peekable();
879            debug_assert!(current_chunk_ids.is_empty());
880            debug_assert!(current_chunk_descs.is_empty());
881            debug_assert!(current_chunk_runs.is_empty());
882            debug_assert_eq!(current_chunk_max_memory_usage, 0);
883            let mut current_chunk_run_ids = BTreeSet::new();
884
885            while let Some((desc, meta, parts)) = run_iter.next() {
886                let run_size = max_part_bytes(parts, cfg);
887                current_chunk_runs.push((desc, meta, parts));
888                current_chunk_max_memory_usage += run_size;
889                current_chunk_run_ids.extend(meta.id);
890
891                if let Some((_, _meta, next_parts)) = run_iter.peek() {
892                    let next_size = max_part_bytes(next_parts, cfg);
893                    if current_chunk_max_memory_usage + next_size > run_reserved_memory_bytes {
894                        // If the current chunk only has one run, record a memory violation metric.
895                        if current_chunk_runs.len() == 1 {
896                            metrics.compaction.memory_violations.inc();
897                            continue;
898                        }
899                        // Flush the current chunk and start a new one.
900                        chunks.push((
901                            CompactionInput::PartialBatch(
902                                spine_id,
903                                mem::take(&mut current_chunk_run_ids),
904                            ),
905                            desc_range([batch.desc.clone()], since.clone()),
906                            std::mem::take(&mut current_chunk_runs),
907                            current_chunk_max_memory_usage,
908                        ));
909                        current_chunk_max_memory_usage = 0;
910                    }
911                }
912            }
913
914            if !current_chunk_runs.is_empty() {
915                chunks.push((
916                    CompactionInput::PartialBatch(spine_id, mem::take(&mut current_chunk_run_ids)),
917                    desc_range([batch.desc.clone()], since.clone()),
918                    std::mem::take(&mut current_chunk_runs),
919                    current_chunk_max_memory_usage,
920                ));
921                current_chunk_max_memory_usage = 0;
922            }
923        }
924
925        // If we ended with a mixed-batch chunk in progress, flush it.
926        if !current_chunk_ids.is_empty() {
927            chunks.push((
928                input_id_range(current_chunk_ids),
929                desc_range(current_chunk_descs, since.clone()),
930                current_chunk_runs,
931                current_chunk_max_memory_usage,
932            ));
933        }
934
935        chunks
936    }
937
938    /// Compacts runs together. If the input runs are sorted, a single run will be created as output.
939    ///
940    /// Maximum possible memory usage is `(# runs + 2) * [crate::PersistConfig::blob_target_size]`
941    pub(crate) async fn compact_runs(
942        cfg: &CompactConfig,
943        shard_id: &ShardId,
944        desc: &Description<T>,
945        runs: Vec<(&Description<T>, &RunMeta, &[RunPart<T>])>,
946        blob: Arc<dyn Blob>,
947        metrics: Arc<Metrics>,
948        shard_metrics: Arc<ShardMetrics>,
949        isolated_runtime: Arc<IsolatedRuntime>,
950        write_schemas: Schemas<K, V>,
951    ) -> Result<HollowBatch<T>, anyhow::Error> {
952        // TODO: Figure out a more principled way to allocate our memory budget.
953        // Currently, we give any excess budget to write parallelism. If we had
954        // to pick between 100% towards writes vs 100% towards reads, then reads
955        // is almost certainly better, but the ideal is probably somewhere in
956        // between the two.
957        //
958        // For now, invent some some extra budget out of thin air for prefetch.
959        let prefetch_budget_bytes = 2 * cfg.batch.blob_target_size;
960
961        let mut timings = Timings::default();
962
963        let mut batch_cfg = cfg.batch.clone();
964
965        // Use compaction as a method of getting inline writes out of state, to
966        // make room for more inline writes. We could instead do this at the end
967        // of compaction by flushing out the batch, but doing it here based on
968        // the config allows BatchBuilder to do its normal pipelining of writes.
969        batch_cfg.inline_writes_single_max_bytes = 0;
970
971        let parts = BatchParts::new_ordered::<D>(
972            batch_cfg,
973            cfg.batch.preferred_order,
974            Arc::clone(&metrics),
975            Arc::clone(&shard_metrics),
976            *shard_id,
977            Arc::clone(&blob),
978            Arc::clone(&isolated_runtime),
979            &metrics.compaction.batch,
980        );
981        let mut batch = BatchBuilderInternal::<K, V, T, D>::new(
982            cfg.batch.clone(),
983            parts,
984            Arc::clone(&metrics),
985            write_schemas.clone(),
986            Arc::clone(&blob),
987            shard_id.clone(),
988            cfg.version.clone(),
989        );
990
991        let mut consolidator = Consolidator::new(
992            format!(
993                "{}[lower={:?},upper={:?}]",
994                shard_id,
995                desc.lower().elements(),
996                desc.upper().elements()
997            ),
998            cfg.fetch_config.clone(),
999            *shard_id,
1000            StructuredSort::<K, V, T, D>::new(write_schemas.clone()),
1001            blob,
1002            Arc::clone(&metrics),
1003            shard_metrics,
1004            metrics.read.compaction.clone(),
1005            FetchBatchFilter::Compaction {
1006                since: desc.since().clone(),
1007            },
1008            None,
1009            prefetch_budget_bytes,
1010        );
1011
1012        for (desc, meta, parts) in runs {
1013            consolidator.enqueue_run(desc, meta, parts.iter().cloned());
1014        }
1015
1016        let remaining_budget = consolidator.start_prefetches();
1017        if remaining_budget.is_none() {
1018            metrics.compaction.not_all_prefetched.inc();
1019        }
1020
1021        loop {
1022            let mut chunks = vec![];
1023            let mut total_bytes = 0;
1024            // We attempt to pull chunks out of the consolidator that match our target size,
1025            // but it's possible that we may get smaller chunks... for example, if not all
1026            // parts have been fetched yet. Loop until we've got enough data to justify flushing
1027            // it out to blob (or we run out of data.)
1028            while total_bytes < cfg.batch.blob_target_size {
1029                let fetch_start = Instant::now();
1030                let Some(chunk) = consolidator
1031                    .next_chunk(
1032                        cfg.compaction_yield_after_n_updates,
1033                        cfg.batch.blob_target_size - total_bytes,
1034                    )
1035                    .await?
1036                else {
1037                    break;
1038                };
1039                timings.part_fetching += fetch_start.elapsed();
1040                total_bytes += chunk.goodbytes();
1041                chunks.push(chunk);
1042                tokio::task::yield_now().await;
1043            }
1044
1045            // In the hopefully-common case of a single chunk, this will not copy.
1046            let Some(updates) = Part::concat(&chunks).expect("compaction produces well-typed data")
1047            else {
1048                break;
1049            };
1050            batch.flush_part(desc.clone(), updates).await;
1051        }
1052        let mut batch = batch.finish(desc.clone()).await?;
1053
1054        // We use compaction as a method of getting inline writes out of state,
1055        // to make room for more inline writes. This happens in
1056        // `CompactConfig::new` by overriding the inline writes threshold
1057        // config. This is a bit action-at-a-distance, so defensively detect if
1058        // this breaks here and log and correct it if so.
1059        let has_inline_parts = batch.batch.parts.iter().any(|x| x.is_inline());
1060        if has_inline_parts {
1061            error!(%shard_id, ?cfg, "compaction result unexpectedly had inline writes");
1062            let () = batch
1063                .flush_to_blob(
1064                    &cfg.batch,
1065                    &metrics.compaction.batch,
1066                    &isolated_runtime,
1067                    &write_schemas,
1068                )
1069                .await;
1070        }
1071
1072        timings.record(&metrics);
1073        Ok(batch.into_hollow_batch())
1074    }
1075
1076    fn validate_req(req: &CompactReq<T>) -> Result<(), anyhow::Error> {
1077        let mut frontier = req.desc.lower();
1078        for input in req.inputs.iter() {
1079            if PartialOrder::less_than(req.desc.since(), input.batch.desc.since()) {
1080                return Err(anyhow!(
1081                    "output since {:?} must be at or in advance of input since {:?}",
1082                    req.desc.since(),
1083                    input.batch.desc.since()
1084                ));
1085            }
1086            if frontier != input.batch.desc.lower() {
1087                return Err(anyhow!(
1088                    "invalid merge of non-consecutive batches {:?} vs {:?}",
1089                    frontier,
1090                    input.batch.desc.lower()
1091                ));
1092            }
1093            frontier = input.batch.desc.upper();
1094        }
1095        if frontier != req.desc.upper() {
1096            return Err(anyhow!(
1097                "invalid merge of non-consecutive batches {:?} vs {:?}",
1098                frontier,
1099                req.desc.upper()
1100            ));
1101        }
1102        Ok(())
1103    }
1104}
1105
1106#[derive(Debug, Default)]
1107struct Timings {
1108    part_fetching: Duration,
1109    heap_population: Duration,
1110}
1111
1112impl Timings {
1113    fn record(self, metrics: &Metrics) {
1114        // intentionally deconstruct so we don't forget to consider each field
1115        let Timings {
1116            part_fetching,
1117            heap_population,
1118        } = self;
1119
1120        metrics
1121            .compaction
1122            .steps
1123            .part_fetch_seconds
1124            .inc_by(part_fetching.as_secs_f64());
1125        metrics
1126            .compaction
1127            .steps
1128            .heap_population_seconds
1129            .inc_by(heap_population.as_secs_f64());
1130    }
1131}
1132
1133#[cfg(test)]
1134mod tests {
1135    use mz_dyncfg::ConfigUpdates;
1136    use mz_ore::{assert_contains, assert_err};
1137    use mz_persist_types::codec_impls::StringSchema;
1138    use timely::progress::Antichain;
1139
1140    use crate::PersistLocation;
1141    use crate::batch::BLOB_TARGET_SIZE;
1142    use crate::internal::trace::SpineId;
1143    use crate::tests::{all_ok, expect_fetch_part, new_test_client_cache};
1144
1145    use super::*;
1146
1147    // A regression test for a bug caught during development of materialize#13160 (never
1148    // made it to main) where batches written by compaction would always have a
1149    // since of the minimum timestamp.
1150    #[mz_persist_proc::test(tokio::test)]
1151    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1152    async fn regression_minimum_since(dyncfgs: ConfigUpdates) {
1153        let data = vec![
1154            (("0".to_owned(), "zero".to_owned()), 0, 1),
1155            (("0".to_owned(), "zero".to_owned()), 1, -1),
1156            (("1".to_owned(), "one".to_owned()), 1, 1),
1157        ];
1158
1159        let cache = new_test_client_cache(&dyncfgs);
1160        cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1161        let (mut write, _) = cache
1162            .open(PersistLocation::new_in_mem())
1163            .await
1164            .expect("client construction failed")
1165            .expect_open::<String, String, u64, i64>(ShardId::new())
1166            .await;
1167        let b0 = write
1168            .expect_batch(&data[..1], 0, 1)
1169            .await
1170            .into_hollow_batch();
1171        let b1 = write
1172            .expect_batch(&data[1..], 1, 2)
1173            .await
1174            .into_hollow_batch();
1175
1176        let req = CompactReq {
1177            shard_id: write.machine.shard_id(),
1178            desc: Description::new(
1179                b0.desc.lower().clone(),
1180                b1.desc.upper().clone(),
1181                Antichain::from_elem(10u64),
1182            ),
1183            inputs: vec![
1184                IdHollowBatch {
1185                    batch: Arc::new(b0),
1186                    id: SpineId(0, 1),
1187                },
1188                IdHollowBatch {
1189                    batch: Arc::new(b1),
1190                    id: SpineId(1, 2),
1191                },
1192            ],
1193        };
1194        let schemas = Schemas {
1195            id: None,
1196            key: Arc::new(StringSchema),
1197            val: Arc::new(StringSchema),
1198        };
1199        let res = Compactor::<String, String, u64, i64>::compact(
1200            CompactConfig::new(&write.cfg, write.shard_id()),
1201            Arc::clone(&write.blob),
1202            Arc::clone(&write.metrics),
1203            write.metrics.shards.shard(&write.machine.shard_id(), ""),
1204            Arc::new(IsolatedRuntime::new_for_tests()),
1205            req.clone(),
1206            schemas.clone(),
1207        )
1208        .await
1209        .expect("compaction failed");
1210
1211        assert_eq!(res.output.desc, req.desc);
1212        assert_eq!(res.output.len, 1);
1213        assert_eq!(res.output.part_count(), 1);
1214        let part = res.output.parts[0].expect_hollow_part();
1215        let (part, updates) = expect_fetch_part(
1216            write.blob.as_ref(),
1217            &part.key.complete(&write.machine.shard_id()),
1218            &write.metrics,
1219            &schemas,
1220        )
1221        .await;
1222        assert_eq!(part.desc, res.output.desc);
1223        assert_eq!(updates, all_ok(&data, 10));
1224    }
1225
1226    #[mz_persist_proc::test(tokio::test)]
1227    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1228    async fn disable_compaction(dyncfgs: ConfigUpdates) {
1229        let data = [
1230            (("0".to_owned(), "zero".to_owned()), 0, 1),
1231            (("0".to_owned(), "zero".to_owned()), 1, -1),
1232            (("1".to_owned(), "one".to_owned()), 1, 1),
1233        ];
1234
1235        let cache = new_test_client_cache(&dyncfgs);
1236        cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1237        let (mut write, _) = cache
1238            .open(PersistLocation::new_in_mem())
1239            .await
1240            .expect("client construction failed")
1241            .expect_open::<String, String, u64, i64>(ShardId::new())
1242            .await;
1243        let b0 = write
1244            .expect_batch(&data[..1], 0, 1)
1245            .await
1246            .into_hollow_batch();
1247        let b1 = write
1248            .expect_batch(&data[1..], 1, 2)
1249            .await
1250            .into_hollow_batch();
1251
1252        let req = CompactReq {
1253            shard_id: write.machine.shard_id(),
1254            desc: Description::new(
1255                b0.desc.lower().clone(),
1256                b1.desc.upper().clone(),
1257                Antichain::from_elem(10u64),
1258            ),
1259            inputs: vec![
1260                IdHollowBatch {
1261                    batch: Arc::new(b0),
1262                    id: SpineId(0, 1),
1263                },
1264                IdHollowBatch {
1265                    batch: Arc::new(b1),
1266                    id: SpineId(1, 2),
1267                },
1268            ],
1269        };
1270        write.cfg.set_config(&COMPACTION_HEURISTIC_MIN_INPUTS, 1);
1271        let compactor = write.compact.as_ref().expect("compaction hard disabled");
1272
1273        write.cfg.disable_compaction();
1274        let result = compactor
1275            .compact_and_apply_background(req.clone(), &write.machine)
1276            .expect("listener")
1277            .await
1278            .expect("channel closed");
1279        assert_err!(result);
1280        assert_contains!(result.unwrap_err().to_string(), "compaction disabled");
1281
1282        write.cfg.enable_compaction();
1283        compactor
1284            .compact_and_apply_background(req, &write.machine)
1285            .expect("listener")
1286            .await
1287            .expect("channel closed")
1288            .expect("compaction success");
1289
1290        // Make sure our CYA dyncfg works.
1291        let data2 = [
1292            (("2".to_owned(), "two".to_owned()), 2, 1),
1293            (("2".to_owned(), "two".to_owned()), 3, -1),
1294            (("3".to_owned(), "three".to_owned()), 3, 1),
1295        ];
1296
1297        let b2 = write
1298            .expect_batch(&data2[..1], 2, 3)
1299            .await
1300            .into_hollow_batch();
1301        let b3 = write
1302            .expect_batch(&data2[1..], 3, 4)
1303            .await
1304            .into_hollow_batch();
1305
1306        let req = CompactReq {
1307            shard_id: write.machine.shard_id(),
1308            desc: Description::new(
1309                b2.desc.lower().clone(),
1310                b3.desc.upper().clone(),
1311                Antichain::from_elem(20u64),
1312            ),
1313            inputs: vec![
1314                IdHollowBatch {
1315                    batch: Arc::new(b2),
1316                    id: SpineId(0, 1),
1317                },
1318                IdHollowBatch {
1319                    batch: Arc::new(b3),
1320                    id: SpineId(1, 2),
1321                },
1322            ],
1323        };
1324        let compactor = write.compact.as_ref().expect("compaction hard disabled");
1325
1326        // When the dyncfg is set to false we should ignore the process flag.
1327        write.cfg.set_config(&COMPACTION_CHECK_PROCESS_FLAG, false);
1328        write.cfg.disable_compaction();
1329        // Compaction still succeeded!
1330        compactor
1331            .compact_and_apply_background(req, &write.machine)
1332            .expect("listener")
1333            .await
1334            .expect("channel closed")
1335            .expect("compaction success");
1336    }
1337}