mz_persist_client/internal/
compact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11use std::fmt::Debug;
12use std::marker::PhantomData;
13use std::mem;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use differential_dataflow::difference::Monoid;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::Description;
21use futures::{Stream, pin_mut};
22use futures_util::StreamExt;
23use itertools::Either;
24use mz_dyncfg::Config;
25use mz_ore::cast::CastFrom;
26use mz_ore::error::ErrorExt;
27use mz_ore::now::NowFn;
28use mz_ore::soft_assert_or_log;
29use mz_persist::location::Blob;
30use mz_persist_types::part::Part;
31use mz_persist_types::{Codec, Codec64};
32use timely::PartialOrder;
33use timely::progress::{Antichain, Timestamp};
34use tokio::sync::mpsc::Sender;
35use tokio::sync::{TryAcquireError, mpsc, oneshot};
36use tracing::{Instrument, Span, debug, debug_span, error, trace, warn};
37
38use crate::async_runtime::IsolatedRuntime;
39use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, BatchParts, PartDeletes};
40use crate::cfg::{
41    COMPACTION_HEURISTIC_MIN_INPUTS, COMPACTION_HEURISTIC_MIN_PARTS,
42    COMPACTION_HEURISTIC_MIN_UPDATES, COMPACTION_MEMORY_BOUND_BYTES,
43    GC_BLOB_DELETE_CONCURRENCY_LIMIT, MiB,
44};
45use crate::fetch::{FetchBatchFilter, FetchConfig};
46use crate::internal::encoding::Schemas;
47use crate::internal::gc::GarbageCollector;
48use crate::internal::machine::Machine;
49use crate::internal::maintenance::RoutineMaintenance;
50use crate::internal::metrics::ShardMetrics;
51use crate::internal::state::{HollowBatch, RunMeta, RunOrder, RunPart};
52use crate::internal::trace::{
53    ActiveCompaction, ApplyMergeResult, CompactionInput, FueledMergeRes, IdHollowBatch, SpineId,
54    id_range,
55};
56use crate::iter::{Consolidator, StructuredSort};
57use crate::{Metrics, PersistConfig, ShardId};
58
59/// A request for compaction.
60///
61/// This is similar to FueledMergeReq, but intentionally a different type. If we
62/// move compaction to an rpc server, this one will become a protobuf; the type
63/// parameters will become names of codecs to look up in some registry.
64#[derive(Debug, Clone)]
65pub struct CompactReq<T> {
66    /// The shard the input and output batches belong to.
67    pub shard_id: ShardId,
68    /// A description for the output batch.
69    pub desc: Description<T>,
70    /// The updates to include in the output batch. Any data in these outside of
71    /// the output descriptions bounds should be ignored.
72    pub inputs: Vec<IdHollowBatch<T>>,
73}
74
75/// A response from compaction.
76#[derive(Debug)]
77pub struct CompactRes<T> {
78    /// The compacted batch.
79    pub output: HollowBatch<T>,
80    /// The runs that were compacted together to produce the output batch.
81    pub input: CompactionInput,
82}
83
84/// A snapshot of dynamic configs to make it easier to reason about an
85/// individual run of compaction.
86#[derive(Debug, Clone)]
87pub struct CompactConfig {
88    pub(crate) compaction_memory_bound_bytes: usize,
89    pub(crate) compaction_yield_after_n_updates: usize,
90    pub(crate) version: semver::Version,
91    pub(crate) batch: BatchBuilderConfig,
92    pub(crate) fetch_config: FetchConfig,
93    pub(crate) now: NowFn,
94}
95
96impl CompactConfig {
97    /// Initialize the compaction config from Persist configuration.
98    pub fn new(value: &PersistConfig, shard_id: ShardId) -> Self {
99        CompactConfig {
100            compaction_memory_bound_bytes: COMPACTION_MEMORY_BOUND_BYTES.get(value),
101            compaction_yield_after_n_updates: value.compaction_yield_after_n_updates,
102            version: value.build_version.clone(),
103            batch: BatchBuilderConfig::new(value, shard_id),
104            fetch_config: FetchConfig::from_persist_config(value),
105            now: value.now.clone(),
106        }
107    }
108}
109
110/// A service for performing physical and logical compaction.
111///
112/// This will possibly be called over RPC in the future. Physical compaction is
113/// merging adjacent batches. Logical compaction is advancing timestamps to a
114/// new since and consolidating the resulting updates.
115#[derive(Debug)]
116pub struct Compactor<K, V, T, D> {
117    cfg: PersistConfig,
118    metrics: Arc<Metrics>,
119    sender: Sender<(
120        Instant,
121        CompactReq<T>,
122        Machine<K, V, T, D>,
123        oneshot::Sender<Result<(), anyhow::Error>>,
124    )>,
125    _phantom: PhantomData<fn() -> D>,
126}
127
128impl<K, V, T, D> Clone for Compactor<K, V, T, D> {
129    fn clone(&self) -> Self {
130        Compactor {
131            cfg: self.cfg.clone(),
132            metrics: Arc::clone(&self.metrics),
133            sender: self.sender.clone(),
134            _phantom: Default::default(),
135        }
136    }
137}
138
139/// In Compactor::compact_and_apply_background, the minimum amount of time to
140/// allow a compaction request to run before timing it out. A request may be
141/// given a timeout greater than this value depending on the inputs' size
142pub(crate) const COMPACTION_MINIMUM_TIMEOUT: Config<Duration> = Config::new(
143    "persist_compaction_minimum_timeout",
144    Duration::from_secs(90),
145    "\
146    The minimum amount of time to allow a persist compaction request to run \
147    before timing it out (Materialize).",
148);
149
150pub(crate) const COMPACTION_CHECK_PROCESS_FLAG: Config<bool> = Config::new(
151    "persist_compaction_check_process_flag",
152    true,
153    "Whether Compactor will obey the process_requests flag in PersistConfig, \
154        which allows dynamically disabling compaction. If false, all compaction requests will be processed.",
155);
156
157/// Create a `[CompactionInput::IdRange]` from a set of `SpineId`s.
158fn input_id_range(ids: BTreeSet<SpineId>) -> CompactionInput {
159    let id = id_range(ids);
160
161    CompactionInput::IdRange(id)
162}
163
164impl<K, V, T, D> Compactor<K, V, T, D>
165where
166    K: Debug + Codec,
167    V: Debug + Codec,
168    T: Timestamp + Lattice + Codec64 + Sync,
169    D: Monoid + Ord + Codec64 + Send + Sync,
170{
171    pub fn new(
172        cfg: PersistConfig,
173        metrics: Arc<Metrics>,
174        gc: GarbageCollector<K, V, T, D>,
175    ) -> Self {
176        let (compact_req_sender, mut compact_req_receiver) = mpsc::channel::<(
177            Instant,
178            CompactReq<T>,
179            Machine<K, V, T, D>,
180            oneshot::Sender<Result<(), anyhow::Error>>,
181        )>(cfg.compaction_queue_size);
182        let concurrency_limit = Arc::new(tokio::sync::Semaphore::new(
183            cfg.compaction_concurrency_limit,
184        ));
185        let check_process_requests = COMPACTION_CHECK_PROCESS_FLAG.handle(&cfg.configs);
186        let process_requests = Arc::clone(&cfg.compaction_process_requests);
187
188        // spin off a single task responsible for executing compaction requests.
189        // work is enqueued into the task through a channel
190        let _worker_handle = mz_ore::task::spawn(|| "PersistCompactionScheduler", async move {
191            while let Some((enqueued, req, machine, completer)) = compact_req_receiver.recv().await
192            {
193                assert_eq!(req.shard_id, machine.shard_id());
194                let metrics = Arc::clone(&machine.applier.metrics);
195
196                // Only allow skipping compaction requests if the dyncfg is enabled.
197                if check_process_requests.get()
198                    && !process_requests.load(std::sync::atomic::Ordering::Relaxed)
199                {
200                    // Respond to the requester, track in our metrics, and log
201                    // that compaction is disabled.
202                    let _ = completer.send(Err(anyhow::anyhow!("compaction disabled")));
203                    metrics.compaction.disabled.inc();
204                    tracing::warn!(shard_id = ?req.shard_id, "Dropping compaction request on the floor.");
205
206                    continue;
207                }
208
209                let permit = {
210                    let inner = Arc::clone(&concurrency_limit);
211                    // perform a non-blocking attempt to acquire a permit so we can
212                    // record how often we're ever blocked on the concurrency limit
213                    match inner.try_acquire_owned() {
214                        Ok(permit) => permit,
215                        Err(TryAcquireError::NoPermits) => {
216                            metrics.compaction.concurrency_waits.inc();
217                            Arc::clone(&concurrency_limit)
218                                .acquire_owned()
219                                .await
220                                .expect("semaphore is never closed")
221                        }
222                        Err(TryAcquireError::Closed) => {
223                            // should never happen in practice. the semaphore is
224                            // never explicitly closed, nor will it close on Drop
225                            warn!("semaphore for shard {} is closed", machine.shard_id());
226                            continue;
227                        }
228                    }
229                };
230                metrics
231                    .compaction
232                    .queued_seconds
233                    .inc_by(enqueued.elapsed().as_secs_f64());
234
235                let compact_span =
236                    debug_span!(parent: None, "compact::apply", shard_id=%machine.shard_id());
237                compact_span.follows_from(&Span::current());
238                let gc = gc.clone();
239                mz_ore::task::spawn(|| "PersistCompactionWorker", async move {
240                    let res = Self::compact_and_apply(&machine, req)
241                        .instrument(compact_span)
242                        .await;
243
244                    match res {
245                        Ok(maintenance) => maintenance.start_performing(&machine, &gc),
246                        Err(err) => {
247                            debug!(shard_id =? machine.shard_id(), "compaction failed: {err:#}")
248                        }
249                    }
250
251                    // we can safely ignore errors here, it's possible the caller
252                    // wasn't interested in waiting and dropped their receiver
253                    let _ = completer.send(Ok(()));
254
255                    // moves `permit` into async scope so it can be dropped upon completion
256                    drop(permit);
257                });
258            }
259        });
260
261        Compactor {
262            cfg,
263            metrics,
264            sender: compact_req_sender,
265            _phantom: PhantomData,
266        }
267    }
268
269    /// Enqueues a [CompactReq] to be consumed by the compaction background task when available.
270    ///
271    /// Returns a receiver that indicates when compaction has completed. The receiver can be
272    /// safely dropped at any time if the caller does not wish to wait on completion.
273    pub fn compact_and_apply_background(
274        &self,
275        req: CompactReq<T>,
276        machine: &Machine<K, V, T, D>,
277    ) -> Option<oneshot::Receiver<Result<(), anyhow::Error>>> {
278        // Run some initial heuristics to ignore some requests for compaction.
279        // We don't gain much from e.g. compacting two very small batches that
280        // were just written, but it does result in non-trivial blob traffic
281        // (especially in aggregate). This heuristic is something we'll need to
282        // tune over time.
283        let should_compact = req.inputs.len() >= COMPACTION_HEURISTIC_MIN_INPUTS.get(&self.cfg)
284            || req
285                .inputs
286                .iter()
287                .map(|x| x.batch.part_count())
288                .sum::<usize>()
289                >= COMPACTION_HEURISTIC_MIN_PARTS.get(&self.cfg)
290            || req.inputs.iter().map(|x| x.batch.len).sum::<usize>()
291                >= COMPACTION_HEURISTIC_MIN_UPDATES.get(&self.cfg);
292        if !should_compact {
293            self.metrics.compaction.skipped.inc();
294            return None;
295        }
296
297        let (compaction_completed_sender, compaction_completed_receiver) = oneshot::channel();
298        let new_compaction_sender = self.sender.clone();
299
300        self.metrics.compaction.requested.inc();
301        // NB: we intentionally pass along the input machine, as it ought to come from the
302        // writer that generated the compaction request / maintenance. this machine has a
303        // spine structure that generated the request, so it has a much better chance of
304        // merging and committing the result than a machine kept up-to-date through state
305        // diffs, which may have a different spine structure less amenable to merging.
306        let send = new_compaction_sender.try_send((
307            Instant::now(),
308            req,
309            machine.clone(),
310            compaction_completed_sender,
311        ));
312        if let Err(_) = send {
313            self.metrics.compaction.dropped.inc();
314            return None;
315        }
316
317        Some(compaction_completed_receiver)
318    }
319
320    pub(crate) async fn compact_and_apply(
321        machine: &Machine<K, V, T, D>,
322        req: CompactReq<T>,
323    ) -> Result<RoutineMaintenance, anyhow::Error> {
324        let metrics = Arc::clone(&machine.applier.metrics);
325        metrics.compaction.started.inc();
326        let start = Instant::now();
327
328        // pick a timeout for our compaction request proportional to the amount
329        // of data that must be read (with a minimum set by PersistConfig)
330        let total_input_bytes = req
331            .inputs
332            .iter()
333            .map(|batch| batch.batch.encoded_size_bytes())
334            .sum::<usize>();
335        let timeout = Duration::max(
336            // either our minimum timeout
337            COMPACTION_MINIMUM_TIMEOUT.get(&machine.applier.cfg),
338            // or 1s per MB of input data
339            Duration::from_secs(u64::cast_from(total_input_bytes / MiB)),
340        );
341        // always use most recent schema from all the Runs we're compacting to prevent Compactors
342        // created before the schema was evolved, from trying to "de-evolve" a Part.
343        let Some(compaction_schema_id) = req
344            .inputs
345            .iter()
346            .flat_map(|batch| batch.batch.run_meta.iter())
347            .filter_map(|run_meta| run_meta.schema)
348            // It's an invariant that SchemaIds are ordered.
349            .max()
350        else {
351            metrics.compaction.schema_selection.no_schema.inc();
352            metrics.compaction.failed.inc();
353            return Err(anyhow!(
354                "compacting {shard_id} and spine ids {spine_ids}: could not determine schema id from inputs",
355                shard_id = req.shard_id,
356                spine_ids = mz_ore::str::separated(", ", req.inputs.iter().map(|i| i.id))
357            ));
358        };
359        let Some((key_schema, val_schema)) = machine.get_schema(compaction_schema_id) else {
360            metrics.compaction.schema_selection.no_schema.inc();
361            metrics.compaction.failed.inc();
362            return Err(anyhow!(
363                "compacting {shard_id} and spine ids {spine_ids}: schema id {compaction_schema_id} not present in machine state",
364                shard_id = req.shard_id,
365                spine_ids = mz_ore::str::separated(", ", req.inputs.iter().map(|i| i.id))
366            ));
367        };
368
369        metrics.compaction.schema_selection.recent_schema.inc();
370
371        let compaction_schema = Schemas {
372            id: Some(compaction_schema_id),
373            key: Arc::new(key_schema),
374            val: Arc::new(val_schema),
375        };
376
377        trace!(
378            "compaction request for {}MBs ({} bytes), with timeout of {}s, and schema {:?}.",
379            total_input_bytes / MiB,
380            total_input_bytes,
381            timeout.as_secs_f64(),
382            compaction_schema.id,
383        );
384
385        let isolated_runtime = Arc::clone(&machine.isolated_runtime);
386        let machine_clone = machine.clone();
387        let metrics_clone = Arc::clone(&machine.applier.metrics);
388        let compact_span = debug_span!("compact::consolidate");
389        let res = tokio::time::timeout(
390            timeout,
391            // Compaction is cpu intensive, so be polite and spawn it on the isolated runtime.
392            isolated_runtime.spawn_named(
393                || "persist::compact::consolidate",
394                async move {
395                    // If the batches we are compacting are written with old versions of persist,
396                    // we may not have run UUIDs for them, meaning we don't have enough info to
397                    // safely compact them incrementally.
398                    let all_runs_have_uuids = req
399                        .inputs
400                        .iter()
401                        .all(|x| x.batch.runs().all(|(meta, _)| meta.id.is_some()));
402                    let all_runs_have_len = req
403                        .inputs
404                        .iter()
405                        .all(|x| x.batch.runs().all(|(meta, _)| meta.len.is_some()));
406
407                    let compact_cfg =
408                        CompactConfig::new(&machine_clone.applier.cfg, machine_clone.shard_id());
409                    let incremental_enabled = compact_cfg.batch.enable_incremental_compaction
410                        && all_runs_have_uuids
411                        && all_runs_have_len;
412                    let stream = Self::compact_stream(
413                        compact_cfg,
414                        Arc::clone(&machine_clone.applier.state_versions.blob),
415                        Arc::clone(&metrics_clone),
416                        Arc::clone(&machine_clone.applier.shard_metrics),
417                        Arc::clone(&machine_clone.isolated_runtime),
418                        req.clone(),
419                        compaction_schema,
420                        incremental_enabled,
421                    );
422
423                    let maintenance = if incremental_enabled {
424                        let mut maintenance = RoutineMaintenance::default();
425                        pin_mut!(stream);
426                        while let Some(res) = stream.next().await {
427                            let res = res?;
428                            let new_maintenance =
429                                Self::apply(res, &metrics_clone, &machine_clone).await?;
430                            maintenance.merge(new_maintenance);
431                        }
432                        maintenance
433                    } else {
434                        let res = Self::compact_all(stream, req.clone()).await?;
435                        Self::apply(
436                            FueledMergeRes {
437                                output: res.output,
438                                input: CompactionInput::Legacy,
439                                new_active_compaction: None,
440                            },
441                            &metrics_clone,
442                            &machine_clone,
443                        )
444                        .await?
445                    };
446
447                    Ok::<_, anyhow::Error>(maintenance)
448                }
449                .instrument(compact_span),
450            ),
451        )
452        .await;
453
454        metrics
455            .compaction
456            .seconds
457            .inc_by(start.elapsed().as_secs_f64());
458        let res = res
459            .map_err(|e| {
460                metrics.compaction.timed_out.inc();
461                anyhow!(
462                    "compaction timed out after {}s: {}",
463                    timeout.as_secs_f64(),
464                    e
465                )
466            })?
467            .map_err(|e| anyhow!(e))?;
468
469        match res {
470            Ok(maintenance) => Ok(maintenance),
471            Err(err) => {
472                metrics.compaction.failed.inc();
473                debug!(
474                    "compaction for {} failed: {}",
475                    machine.shard_id(),
476                    err.display_with_causes()
477                );
478                Err(err)
479            }
480        }
481    }
482
483    pub async fn compact_all(
484        stream: impl Stream<Item = Result<FueledMergeRes<T>, anyhow::Error>>,
485        req: CompactReq<T>,
486    ) -> Result<CompactRes<T>, anyhow::Error> {
487        pin_mut!(stream);
488
489        let mut all_parts = vec![];
490        let mut all_run_splits = vec![];
491        let mut all_run_meta = vec![];
492        let mut len = 0;
493
494        while let Some(res) = stream.next().await {
495            let res = res?.output;
496            let (parts, updates, run_meta, run_splits) =
497                (res.parts, res.len, res.run_meta, res.run_splits);
498
499            if updates == 0 {
500                continue;
501            }
502
503            let run_offset = all_parts.len();
504            if !all_parts.is_empty() {
505                all_run_splits.push(run_offset);
506            }
507            all_run_splits.extend(run_splits.iter().map(|r| r + run_offset));
508            all_run_meta.extend(run_meta);
509            all_parts.extend(parts);
510            len += updates;
511        }
512
513        let batches = req.inputs.iter().map(|x| x.id).collect::<BTreeSet<_>>();
514        let input = input_id_range(batches);
515
516        Ok(CompactRes {
517            output: HollowBatch::new(
518                req.desc.clone(),
519                all_parts,
520                len,
521                all_run_meta,
522                all_run_splits,
523            ),
524            input,
525        })
526    }
527
528    pub async fn apply(
529        res: FueledMergeRes<T>,
530        metrics: &Metrics,
531        machine: &Machine<K, V, T, D>,
532    ) -> Result<RoutineMaintenance, anyhow::Error> {
533        let (apply_merge_result, maintenance) = machine.merge_res(&res).await;
534
535        match &apply_merge_result {
536            ApplyMergeResult::AppliedExact => {
537                metrics.compaction.applied.inc();
538                metrics.compaction.applied_exact_match.inc();
539                machine.applier.shard_metrics.compaction_applied.inc();
540            }
541            ApplyMergeResult::AppliedSubset => {
542                metrics.compaction.applied.inc();
543                metrics.compaction.applied_subset_match.inc();
544                machine.applier.shard_metrics.compaction_applied.inc();
545            }
546            ApplyMergeResult::NotAppliedNoMatch
547            | ApplyMergeResult::NotAppliedInvalidSince
548            | ApplyMergeResult::NotAppliedTooManyUpdates => {
549                if let ApplyMergeResult::NotAppliedTooManyUpdates = &apply_merge_result {
550                    metrics.compaction.not_applied_too_many_updates.inc();
551                }
552                metrics.compaction.noop.inc();
553                let mut part_deletes = PartDeletes::default();
554                for part in &res.output.parts {
555                    part_deletes.add(part);
556                }
557                part_deletes
558                    .delete(
559                        machine.applier.state_versions.blob.as_ref(),
560                        machine.shard_id(),
561                        GC_BLOB_DELETE_CONCURRENCY_LIMIT.get(&machine.applier.cfg),
562                        &*metrics,
563                        &metrics.retries.external.compaction_noop_delete,
564                    )
565                    .await;
566            }
567        };
568
569        Ok(maintenance)
570    }
571
572    /// Compacts input batches in bounded memory.
573    ///
574    /// The memory bound is broken into pieces:
575    ///     1. in-progress work
576    ///     2. fetching parts from runs
577    ///     3. additional in-flight requests to Blob
578    ///
579    /// 1. In-progress work is bounded by 2 * [BatchBuilderConfig::blob_target_size]. This
580    ///    usage is met at two mutually exclusive moments:
581    ///   * When reading in a part, we hold the columnar format in memory while writing its
582    ///     contents into a heap.
583    ///   * When writing a part, we hold a temporary updates buffer while encoding/writing
584    ///     it into a columnar format for Blob.
585    ///
586    /// 2. When compacting runs, only 1 part from each one is held in memory at a time.
587    ///    Compaction will determine an appropriate number of runs to compact together
588    ///    given the memory bound and accounting for the reservation in (1). A minimum
589    ///    of 2 * [BatchBuilderConfig::blob_target_size] of memory is expected, to be
590    ///    able to at least have the capacity to compact two runs together at a time,
591    ///    and more runs will be compacted together if more memory is available.
592    ///
593    /// 3. If there is excess memory after accounting for (1) and (2), we increase the
594    ///    number of outstanding parts we can keep in-flight to Blob.
595    pub fn compact_stream(
596        cfg: CompactConfig,
597        blob: Arc<dyn Blob>,
598        metrics: Arc<Metrics>,
599        shard_metrics: Arc<ShardMetrics>,
600        isolated_runtime: Arc<IsolatedRuntime>,
601        req: CompactReq<T>,
602        write_schemas: Schemas<K, V>,
603        incremental_enabled: bool,
604    ) -> impl Stream<Item = Result<FueledMergeRes<T>, anyhow::Error>> {
605        async_stream::stream! {
606            let () = Self::validate_req(&req)?;
607
608            // We introduced a fast-path optimization in https://github.com/MaterializeInc/materialize/pull/15363
609            // but had to revert it due to a very scary bug. Here we count how many of our compaction reqs
610            // could be eligible for the optimization to better understand whether it's worth trying to
611            // reintroduce it.
612            let mut single_nonempty_batch = None;
613            for batch in &req.inputs {
614                if batch.batch.len > 0 {
615                    match single_nonempty_batch {
616                        None => single_nonempty_batch = Some(batch),
617                        Some(_previous_nonempty_batch) => {
618                            single_nonempty_batch = None;
619                            break;
620                        }
621                    }
622                }
623            }
624            if let Some(single_nonempty_batch) = single_nonempty_batch {
625                if single_nonempty_batch.batch.run_splits.len() == 0
626                    && single_nonempty_batch.batch.desc.since() != &Antichain::from_elem(T::minimum())
627                {
628                    metrics.compaction.fast_path_eligible.inc();
629                }
630            }
631
632            // Reserve space for the in-progress part to be held in-mem representation and columnar -
633            let in_progress_part_reserved_memory_bytes = 2 * cfg.batch.blob_target_size;
634            // - then remaining memory will go towards pulling down as many runs as we can.
635            // We'll always do at least two runs per chunk, which means we may go over this limit
636            // if parts are large or the limit is low... though we do at least increment a metric
637            // when that happens.
638            let run_reserved_memory_bytes = cfg
639                .compaction_memory_bound_bytes
640                .saturating_sub(in_progress_part_reserved_memory_bytes);
641
642            let chunked_runs = Self::chunk_runs(
643                &req,
644                &cfg,
645                &*metrics,
646                run_reserved_memory_bytes,
647                req.desc.since()
648            );
649            let total_chunked_runs = chunked_runs.len();
650
651            let parts_before = req.inputs.iter().map(|x| x.batch.parts.len()).sum::<usize>();
652            let parts_after = chunked_runs.iter().flat_map(|(_, _, runs, _)| runs.iter().map(|(_, _, parts)| parts.len())).sum::<usize>();
653            assert_eq!(parts_before, parts_after, "chunking should not change the number of parts");
654
655            for (applied, (input, desc, runs, run_chunk_max_memory_usage)) in
656                chunked_runs.into_iter().enumerate()
657            {
658                metrics.compaction.chunks_compacted.inc();
659                metrics
660                    .compaction
661                    .runs_compacted
662                    .inc_by(u64::cast_from(runs.len()));
663
664                // given the runs we actually have in our batch, we might have extra memory
665                // available. we reserved enough space to always have 1 in-progress part in
666                // flight, but if we have excess, we can use it to increase our write parallelism
667                let extra_outstanding_parts = (run_reserved_memory_bytes
668                    .saturating_sub(run_chunk_max_memory_usage))
669                    / cfg.batch.blob_target_size;
670                let mut run_cfg = cfg.clone();
671                run_cfg.batch.batch_builder_max_outstanding_parts = 1 + extra_outstanding_parts;
672
673                let desc = if incremental_enabled {
674                    desc
675                } else {
676                    req.desc.clone()
677                };
678
679                let runs = runs.iter()
680                    .map(|(desc, meta, run)| (*desc, *meta, *run))
681                    .collect::<Vec<_>>();
682
683                let batch = Self::compact_runs(
684                    &run_cfg,
685                    &req.shard_id,
686                    &desc,
687                    runs,
688                    Arc::clone(&blob),
689                    Arc::clone(&metrics),
690                    Arc::clone(&shard_metrics),
691                    Arc::clone(&isolated_runtime),
692                    write_schemas.clone(),
693                )
694                .await?;
695
696                assert!(
697                    (batch.len == 0 && batch.parts.len() == 0) || (batch.len > 0 && batch.parts.len() > 0),
698                    "updates={}, parts={}",
699                    batch.len,
700                    batch.parts.len(),
701                );
702
703                // Set up active compaction metadata
704                let clock = cfg.now.clone();
705                let active_compaction = if applied < total_chunked_runs - 1 {
706                    Some(ActiveCompaction { start_ms: clock() })
707                } else {
708                    None
709                };
710
711                let res = CompactRes {
712                    output: batch,
713                    input,
714                };
715
716                let res = FueledMergeRes {
717                    output: res.output,
718                    new_active_compaction: active_compaction,
719                    input: res.input,
720                };
721
722                yield Ok(res);
723            }
724        }
725    }
726
727    /// Compacts the input batches together, returning a single compacted batch.
728    /// Under the hood this just calls [Self::compact_stream] and
729    /// [Self::compact_all], but it is a convenience method that allows
730    /// the caller to not have to deal with the streaming API.
731    pub async fn compact(
732        cfg: CompactConfig,
733        blob: Arc<dyn Blob>,
734        metrics: Arc<Metrics>,
735        shard_metrics: Arc<ShardMetrics>,
736        isolated_runtime: Arc<IsolatedRuntime>,
737        req: CompactReq<T>,
738        write_schemas: Schemas<K, V>,
739    ) -> Result<CompactRes<T>, anyhow::Error> {
740        let stream = Self::compact_stream(
741            cfg,
742            Arc::clone(&blob),
743            Arc::clone(&metrics),
744            Arc::clone(&shard_metrics),
745            Arc::clone(&isolated_runtime),
746            req.clone(),
747            write_schemas,
748            false,
749        );
750
751        Self::compact_all(stream, req).await
752    }
753
754    /// Chunks runs with the following rules:
755    /// 1. Runs from multiple batches are allowed to be mixed as long as _every_ run in the
756    ///    batch is present in the chunk.
757    /// 2. Otherwise, runs are split into chunks of runs from a single batch.
758    fn chunk_runs<'a>(
759        req: &'a CompactReq<T>,
760        cfg: &CompactConfig,
761        metrics: &Metrics,
762        run_reserved_memory_bytes: usize,
763        since: &Antichain<T>,
764    ) -> Vec<(
765        CompactionInput,
766        Description<T>,
767        Vec<(&'a Description<T>, &'a RunMeta, &'a [RunPart<T>])>,
768        usize,
769    )> {
770        // Assert that all of the inputs are contiguous / can be compacted together.
771        let _ = input_id_range(req.inputs.iter().map(|x| x.id).collect());
772
773        // Iterate through batches by spine id.
774        let mut batches: Vec<_> = req.inputs.iter().map(|x| (x.id, &*x.batch)).collect();
775        batches.sort_by_key(|(id, _)| *id);
776
777        let mut chunks = vec![];
778        let mut current_chunk_ids = BTreeSet::new();
779        let mut current_chunk_descs = Vec::new();
780        let mut current_chunk_runs = vec![];
781        let mut current_chunk_max_memory_usage = 0;
782
783        fn max_part_bytes<T>(parts: &[RunPart<T>], cfg: &CompactConfig) -> usize {
784            parts
785                .iter()
786                .map(|p| p.max_part_bytes())
787                .max()
788                .unwrap_or(cfg.batch.blob_target_size)
789        }
790
791        fn desc_range<T: Timestamp>(
792            descs: impl IntoIterator<Item = Description<T>>,
793            since: Antichain<T>,
794        ) -> Description<T> {
795            let mut descs = descs.into_iter();
796            let first = descs.next().expect("non-empty set of descriptions");
797            let lower = first.lower().clone();
798            let mut upper = first.upper().clone();
799            for desc in descs {
800                assert_eq!(&upper, desc.lower());
801                upper = desc.upper().clone();
802            }
803            let upper = upper.clone();
804            Description::new(lower, upper, since)
805        }
806
807        for (spine_id, batch) in batches {
808            let batch_size = batch
809                .runs()
810                .map(|(_, parts)| max_part_bytes(parts, cfg))
811                .sum::<usize>();
812
813            let num_runs = batch.run_meta.len();
814
815            let runs = batch.runs().flat_map(|(meta, parts)| {
816                if meta.order.unwrap_or(RunOrder::Codec) == cfg.batch.preferred_order {
817                    Either::Left(std::iter::once((&batch.desc, meta, parts)))
818                } else {
819                    // The downstream consolidation step will handle a long run that's not in
820                    // the desired order by splitting it up into many single-element runs. This preserves
821                    // correctness, but it means that we may end up needing to iterate through
822                    // many more parts concurrently than expected, increasing memory use. Instead,
823                    // we break up those runs into individual batch parts, fetching hollow runs as
824                    // necessary, before they're grouped together to be passed to consolidation.
825                    // The downside is that this breaks the usual property that compaction produces
826                    // fewer runs than it takes in. This should generally be resolved by future
827                    // runs of compaction.
828                    soft_assert_or_log!(
829                        !parts.iter().any(|r| matches!(r, RunPart::Many(_))),
830                        "unexpected out-of-order hollow run"
831                    );
832                    Either::Right(
833                        parts
834                            .iter()
835                            .map(move |p| (&batch.desc, meta, std::slice::from_ref(p))),
836                    )
837                }
838            });
839
840            // Combine the given batch into the current chunk
841            // - if they fit within the memory budget,
842            // - if both have only at most a single run, so otherwise compaction wouldn't make progress.
843            if current_chunk_max_memory_usage + batch_size <= run_reserved_memory_bytes
844                || current_chunk_runs.len() + num_runs <= 2
845            {
846                if current_chunk_max_memory_usage + batch_size > run_reserved_memory_bytes {
847                    // We've chosen to merge these batches together despite being over budget,
848                    // which should be rare.
849                    metrics.compaction.memory_violations.inc();
850                }
851                current_chunk_ids.insert(spine_id);
852                current_chunk_descs.push(batch.desc.clone());
853                current_chunk_runs.extend(runs);
854                current_chunk_max_memory_usage += batch_size;
855                continue;
856            }
857
858            // Otherwise, we cannot mix this batch partially. Flush any existing mixed chunk first.
859            if !current_chunk_ids.is_empty() {
860                chunks.push((
861                    input_id_range(std::mem::take(&mut current_chunk_ids)),
862                    desc_range(mem::take(&mut current_chunk_descs), since.clone()),
863                    std::mem::take(&mut current_chunk_runs),
864                    current_chunk_max_memory_usage,
865                ));
866                current_chunk_max_memory_usage = 0;
867            }
868
869            // If the batch fits within limits, try and accumulate future batches into it.
870            if batch_size <= run_reserved_memory_bytes {
871                current_chunk_ids.insert(spine_id);
872                current_chunk_descs.push(batch.desc.clone());
873                current_chunk_runs.extend(runs);
874                current_chunk_max_memory_usage += batch_size;
875                continue;
876            }
877
878            // This batch is too large to compact with others, or even in a single go.
879            // Process this batch alone, splitting into single-batch chunks as needed.
880            let mut run_iter = runs.into_iter().peekable();
881            debug_assert!(current_chunk_ids.is_empty());
882            debug_assert!(current_chunk_descs.is_empty());
883            debug_assert!(current_chunk_runs.is_empty());
884            debug_assert_eq!(current_chunk_max_memory_usage, 0);
885            let mut current_chunk_run_ids = BTreeSet::new();
886
887            while let Some((desc, meta, parts)) = run_iter.next() {
888                let run_size = max_part_bytes(parts, cfg);
889                current_chunk_runs.push((desc, meta, parts));
890                current_chunk_max_memory_usage += run_size;
891                current_chunk_run_ids.extend(meta.id);
892
893                if let Some((_, _meta, next_parts)) = run_iter.peek() {
894                    let next_size = max_part_bytes(next_parts, cfg);
895                    if current_chunk_max_memory_usage + next_size > run_reserved_memory_bytes {
896                        // If the current chunk only has one run, record a memory violation metric.
897                        if current_chunk_runs.len() == 1 {
898                            metrics.compaction.memory_violations.inc();
899                            continue;
900                        }
901                        // Flush the current chunk and start a new one.
902                        chunks.push((
903                            CompactionInput::PartialBatch(
904                                spine_id,
905                                mem::take(&mut current_chunk_run_ids),
906                            ),
907                            desc_range([batch.desc.clone()], since.clone()),
908                            std::mem::take(&mut current_chunk_runs),
909                            current_chunk_max_memory_usage,
910                        ));
911                        current_chunk_max_memory_usage = 0;
912                    }
913                }
914            }
915
916            if !current_chunk_runs.is_empty() {
917                chunks.push((
918                    CompactionInput::PartialBatch(spine_id, mem::take(&mut current_chunk_run_ids)),
919                    desc_range([batch.desc.clone()], since.clone()),
920                    std::mem::take(&mut current_chunk_runs),
921                    current_chunk_max_memory_usage,
922                ));
923                current_chunk_max_memory_usage = 0;
924            }
925        }
926
927        // If we ended with a mixed-batch chunk in progress, flush it.
928        if !current_chunk_ids.is_empty() {
929            chunks.push((
930                input_id_range(current_chunk_ids),
931                desc_range(current_chunk_descs, since.clone()),
932                current_chunk_runs,
933                current_chunk_max_memory_usage,
934            ));
935        }
936
937        chunks
938    }
939
940    /// Compacts runs together. If the input runs are sorted, a single run will be created as output.
941    ///
942    /// Maximum possible memory usage is `(# runs + 2) * [crate::PersistConfig::blob_target_size]`
943    pub(crate) async fn compact_runs(
944        cfg: &CompactConfig,
945        shard_id: &ShardId,
946        desc: &Description<T>,
947        runs: Vec<(&Description<T>, &RunMeta, &[RunPart<T>])>,
948        blob: Arc<dyn Blob>,
949        metrics: Arc<Metrics>,
950        shard_metrics: Arc<ShardMetrics>,
951        isolated_runtime: Arc<IsolatedRuntime>,
952        write_schemas: Schemas<K, V>,
953    ) -> Result<HollowBatch<T>, anyhow::Error> {
954        // TODO: Figure out a more principled way to allocate our memory budget.
955        // Currently, we give any excess budget to write parallelism. If we had
956        // to pick between 100% towards writes vs 100% towards reads, then reads
957        // is almost certainly better, but the ideal is probably somewhere in
958        // between the two.
959        //
960        // For now, invent some some extra budget out of thin air for prefetch.
961        let prefetch_budget_bytes = 2 * cfg.batch.blob_target_size;
962
963        let mut timings = Timings::default();
964
965        let mut batch_cfg = cfg.batch.clone();
966
967        // Use compaction as a method of getting inline writes out of state, to
968        // make room for more inline writes. We could instead do this at the end
969        // of compaction by flushing out the batch, but doing it here based on
970        // the config allows BatchBuilder to do its normal pipelining of writes.
971        batch_cfg.inline_writes_single_max_bytes = 0;
972
973        let parts = BatchParts::new_ordered::<D>(
974            batch_cfg,
975            cfg.batch.preferred_order,
976            Arc::clone(&metrics),
977            Arc::clone(&shard_metrics),
978            *shard_id,
979            Arc::clone(&blob),
980            Arc::clone(&isolated_runtime),
981            &metrics.compaction.batch,
982        );
983        let mut batch = BatchBuilderInternal::<K, V, T, D>::new(
984            cfg.batch.clone(),
985            parts,
986            Arc::clone(&metrics),
987            write_schemas.clone(),
988            Arc::clone(&blob),
989            shard_id.clone(),
990            cfg.version.clone(),
991        );
992
993        let mut consolidator = Consolidator::new(
994            format!(
995                "{}[lower={:?},upper={:?}]",
996                shard_id,
997                desc.lower().elements(),
998                desc.upper().elements()
999            ),
1000            cfg.fetch_config.clone(),
1001            *shard_id,
1002            StructuredSort::<K, V, T, D>::new(write_schemas.clone()),
1003            blob,
1004            Arc::clone(&metrics),
1005            shard_metrics,
1006            metrics.read.compaction.clone(),
1007            FetchBatchFilter::Compaction {
1008                since: desc.since().clone(),
1009            },
1010            None,
1011            prefetch_budget_bytes,
1012        );
1013
1014        for (desc, meta, parts) in runs {
1015            consolidator.enqueue_run(desc, meta, parts.iter().cloned());
1016        }
1017
1018        let remaining_budget = consolidator.start_prefetches();
1019        if remaining_budget.is_none() {
1020            metrics.compaction.not_all_prefetched.inc();
1021        }
1022
1023        loop {
1024            let mut chunks = vec![];
1025            let mut total_bytes = 0;
1026            // We attempt to pull chunks out of the consolidator that match our target size,
1027            // but it's possible that we may get smaller chunks... for example, if not all
1028            // parts have been fetched yet. Loop until we've got enough data to justify flushing
1029            // it out to blob (or we run out of data.)
1030            while total_bytes < cfg.batch.blob_target_size {
1031                let fetch_start = Instant::now();
1032                let Some(chunk) = consolidator
1033                    .next_chunk(
1034                        cfg.compaction_yield_after_n_updates,
1035                        cfg.batch.blob_target_size - total_bytes,
1036                    )
1037                    .await?
1038                else {
1039                    break;
1040                };
1041                timings.part_fetching += fetch_start.elapsed();
1042                total_bytes += chunk.goodbytes();
1043                chunks.push(chunk);
1044                tokio::task::yield_now().await;
1045            }
1046
1047            // In the hopefully-common case of a single chunk, this will not copy.
1048            let Some(updates) = Part::concat(&chunks).expect("compaction produces well-typed data")
1049            else {
1050                break;
1051            };
1052            batch.flush_part(desc.clone(), updates).await;
1053        }
1054        let mut batch = batch.finish(desc.clone()).await?;
1055
1056        // We use compaction as a method of getting inline writes out of state,
1057        // to make room for more inline writes. This happens in
1058        // `CompactConfig::new` by overriding the inline writes threshold
1059        // config. This is a bit action-at-a-distance, so defensively detect if
1060        // this breaks here and log and correct it if so.
1061        let has_inline_parts = batch.batch.parts.iter().any(|x| x.is_inline());
1062        if has_inline_parts {
1063            error!(%shard_id, ?cfg, "compaction result unexpectedly had inline writes");
1064            let () = batch
1065                .flush_to_blob(
1066                    &cfg.batch,
1067                    &metrics.compaction.batch,
1068                    &isolated_runtime,
1069                    &write_schemas,
1070                )
1071                .await;
1072        }
1073
1074        timings.record(&metrics);
1075        Ok(batch.into_hollow_batch())
1076    }
1077
1078    fn validate_req(req: &CompactReq<T>) -> Result<(), anyhow::Error> {
1079        let mut frontier = req.desc.lower();
1080        for input in req.inputs.iter() {
1081            if PartialOrder::less_than(req.desc.since(), input.batch.desc.since()) {
1082                return Err(anyhow!(
1083                    "output since {:?} must be at or in advance of input since {:?}",
1084                    req.desc.since(),
1085                    input.batch.desc.since()
1086                ));
1087            }
1088            if frontier != input.batch.desc.lower() {
1089                return Err(anyhow!(
1090                    "invalid merge of non-consecutive batches {:?} vs {:?}",
1091                    frontier,
1092                    input.batch.desc.lower()
1093                ));
1094            }
1095            frontier = input.batch.desc.upper();
1096        }
1097        if frontier != req.desc.upper() {
1098            return Err(anyhow!(
1099                "invalid merge of non-consecutive batches {:?} vs {:?}",
1100                frontier,
1101                req.desc.upper()
1102            ));
1103        }
1104        Ok(())
1105    }
1106}
1107
1108#[derive(Debug, Default)]
1109struct Timings {
1110    part_fetching: Duration,
1111    heap_population: Duration,
1112}
1113
1114impl Timings {
1115    fn record(self, metrics: &Metrics) {
1116        // intentionally deconstruct so we don't forget to consider each field
1117        let Timings {
1118            part_fetching,
1119            heap_population,
1120        } = self;
1121
1122        metrics
1123            .compaction
1124            .steps
1125            .part_fetch_seconds
1126            .inc_by(part_fetching.as_secs_f64());
1127        metrics
1128            .compaction
1129            .steps
1130            .heap_population_seconds
1131            .inc_by(heap_population.as_secs_f64());
1132    }
1133}
1134
1135#[cfg(test)]
1136mod tests {
1137    use mz_dyncfg::ConfigUpdates;
1138    use mz_ore::{assert_contains, assert_err};
1139    use mz_persist_types::codec_impls::StringSchema;
1140    use timely::progress::Antichain;
1141
1142    use crate::PersistLocation;
1143    use crate::batch::BLOB_TARGET_SIZE;
1144    use crate::internal::trace::SpineId;
1145    use crate::tests::{all_ok, expect_fetch_part, new_test_client_cache};
1146
1147    use super::*;
1148
1149    // A regression test for a bug caught during development of materialize#13160 (never
1150    // made it to main) where batches written by compaction would always have a
1151    // since of the minimum timestamp.
1152    #[mz_persist_proc::test(tokio::test)]
1153    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1154    async fn regression_minimum_since(dyncfgs: ConfigUpdates) {
1155        let data = vec![
1156            (("0".to_owned(), "zero".to_owned()), 0, 1),
1157            (("0".to_owned(), "zero".to_owned()), 1, -1),
1158            (("1".to_owned(), "one".to_owned()), 1, 1),
1159        ];
1160
1161        let cache = new_test_client_cache(&dyncfgs);
1162        cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1163        let (mut write, _) = cache
1164            .open(PersistLocation::new_in_mem())
1165            .await
1166            .expect("client construction failed")
1167            .expect_open::<String, String, u64, i64>(ShardId::new())
1168            .await;
1169        let b0 = write
1170            .expect_batch(&data[..1], 0, 1)
1171            .await
1172            .into_hollow_batch();
1173        let b1 = write
1174            .expect_batch(&data[1..], 1, 2)
1175            .await
1176            .into_hollow_batch();
1177
1178        let req = CompactReq {
1179            shard_id: write.machine.shard_id(),
1180            desc: Description::new(
1181                b0.desc.lower().clone(),
1182                b1.desc.upper().clone(),
1183                Antichain::from_elem(10u64),
1184            ),
1185            inputs: vec![
1186                IdHollowBatch {
1187                    batch: Arc::new(b0),
1188                    id: SpineId(0, 1),
1189                },
1190                IdHollowBatch {
1191                    batch: Arc::new(b1),
1192                    id: SpineId(1, 2),
1193                },
1194            ],
1195        };
1196        let schemas = Schemas {
1197            id: None,
1198            key: Arc::new(StringSchema),
1199            val: Arc::new(StringSchema),
1200        };
1201        let res = Compactor::<String, String, u64, i64>::compact(
1202            CompactConfig::new(&write.cfg, write.shard_id()),
1203            Arc::clone(&write.blob),
1204            Arc::clone(&write.metrics),
1205            write.metrics.shards.shard(&write.machine.shard_id(), ""),
1206            Arc::new(IsolatedRuntime::new_for_tests()),
1207            req.clone(),
1208            schemas.clone(),
1209        )
1210        .await
1211        .expect("compaction failed");
1212
1213        assert_eq!(res.output.desc, req.desc);
1214        assert_eq!(res.output.len, 1);
1215        assert_eq!(res.output.part_count(), 1);
1216        let part = res.output.parts[0].expect_hollow_part();
1217        let (part, updates) = expect_fetch_part(
1218            write.blob.as_ref(),
1219            &part.key.complete(&write.machine.shard_id()),
1220            &write.metrics,
1221            &schemas,
1222        )
1223        .await;
1224        assert_eq!(part.desc, res.output.desc);
1225        assert_eq!(updates, all_ok(&data, 10));
1226    }
1227
1228    #[mz_persist_proc::test(tokio::test)]
1229    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1230    async fn disable_compaction(dyncfgs: ConfigUpdates) {
1231        let data = [
1232            (("0".to_owned(), "zero".to_owned()), 0, 1),
1233            (("0".to_owned(), "zero".to_owned()), 1, -1),
1234            (("1".to_owned(), "one".to_owned()), 1, 1),
1235        ];
1236
1237        let cache = new_test_client_cache(&dyncfgs);
1238        cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1239        let (mut write, _) = cache
1240            .open(PersistLocation::new_in_mem())
1241            .await
1242            .expect("client construction failed")
1243            .expect_open::<String, String, u64, i64>(ShardId::new())
1244            .await;
1245        let b0 = write
1246            .expect_batch(&data[..1], 0, 1)
1247            .await
1248            .into_hollow_batch();
1249        let b1 = write
1250            .expect_batch(&data[1..], 1, 2)
1251            .await
1252            .into_hollow_batch();
1253
1254        let req = CompactReq {
1255            shard_id: write.machine.shard_id(),
1256            desc: Description::new(
1257                b0.desc.lower().clone(),
1258                b1.desc.upper().clone(),
1259                Antichain::from_elem(10u64),
1260            ),
1261            inputs: vec![
1262                IdHollowBatch {
1263                    batch: Arc::new(b0),
1264                    id: SpineId(0, 1),
1265                },
1266                IdHollowBatch {
1267                    batch: Arc::new(b1),
1268                    id: SpineId(1, 2),
1269                },
1270            ],
1271        };
1272        write.cfg.set_config(&COMPACTION_HEURISTIC_MIN_INPUTS, 1);
1273        let compactor = write.compact.as_ref().expect("compaction hard disabled");
1274
1275        write.cfg.disable_compaction();
1276        let result = compactor
1277            .compact_and_apply_background(req.clone(), &write.machine)
1278            .expect("listener")
1279            .await
1280            .expect("channel closed");
1281        assert_err!(result);
1282        assert_contains!(result.unwrap_err().to_string(), "compaction disabled");
1283
1284        write.cfg.enable_compaction();
1285        compactor
1286            .compact_and_apply_background(req, &write.machine)
1287            .expect("listener")
1288            .await
1289            .expect("channel closed")
1290            .expect("compaction success");
1291
1292        // Make sure our CYA dyncfg works.
1293        let data2 = [
1294            (("2".to_owned(), "two".to_owned()), 2, 1),
1295            (("2".to_owned(), "two".to_owned()), 3, -1),
1296            (("3".to_owned(), "three".to_owned()), 3, 1),
1297        ];
1298
1299        let b2 = write
1300            .expect_batch(&data2[..1], 2, 3)
1301            .await
1302            .into_hollow_batch();
1303        let b3 = write
1304            .expect_batch(&data2[1..], 3, 4)
1305            .await
1306            .into_hollow_batch();
1307
1308        let req = CompactReq {
1309            shard_id: write.machine.shard_id(),
1310            desc: Description::new(
1311                b2.desc.lower().clone(),
1312                b3.desc.upper().clone(),
1313                Antichain::from_elem(20u64),
1314            ),
1315            inputs: vec![
1316                IdHollowBatch {
1317                    batch: Arc::new(b2),
1318                    id: SpineId(0, 1),
1319                },
1320                IdHollowBatch {
1321                    batch: Arc::new(b3),
1322                    id: SpineId(1, 2),
1323                },
1324            ],
1325        };
1326        let compactor = write.compact.as_ref().expect("compaction hard disabled");
1327
1328        // When the dyncfg is set to false we should ignore the process flag.
1329        write.cfg.set_config(&COMPACTION_CHECK_PROCESS_FLAG, false);
1330        write.cfg.disable_compaction();
1331        // Compaction still succeeded!
1332        compactor
1333            .compact_and_apply_background(req, &write.machine)
1334            .expect("listener")
1335            .await
1336            .expect("channel closed")
1337            .expect("compaction success");
1338    }
1339}