mz_persist_client/internal/
compact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet};
12use std::fmt::Debug;
13use std::marker::PhantomData;
14use std::pin::pin;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use anyhow::anyhow;
19use differential_dataflow::difference::Semigroup;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::trace::Description;
22use futures::{Stream, pin_mut};
23use futures_util::StreamExt;
24use itertools::Itertools;
25use mz_dyncfg::Config;
26use mz_ore::cast::CastFrom;
27use mz_ore::error::ErrorExt;
28use mz_ore::now::NowFn;
29use mz_persist::location::Blob;
30use mz_persist_types::part::Part;
31use mz_persist_types::{Codec, Codec64};
32use timely::PartialOrder;
33use timely::progress::{Antichain, Timestamp};
34use tokio::sync::mpsc::Sender;
35use tokio::sync::{TryAcquireError, mpsc, oneshot};
36use tracing::{Instrument, Span, debug, debug_span, error, trace, warn};
37
38use crate::async_runtime::IsolatedRuntime;
39use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, BatchParts, PartDeletes};
40use crate::cfg::{
41    COMPACTION_HEURISTIC_MIN_INPUTS, COMPACTION_HEURISTIC_MIN_PARTS,
42    COMPACTION_HEURISTIC_MIN_UPDATES, COMPACTION_MEMORY_BOUND_BYTES,
43    GC_BLOB_DELETE_CONCURRENCY_LIMIT, MiB,
44};
45use crate::fetch::{FetchBatchFilter, FetchConfig};
46use crate::internal::encoding::Schemas;
47use crate::internal::gc::GarbageCollector;
48use crate::internal::machine::Machine;
49use crate::internal::maintenance::RoutineMaintenance;
50use crate::internal::metrics::ShardMetrics;
51use crate::internal::state::{
52    ENABLE_INCREMENTAL_COMPACTION, HollowBatch, RunId, RunMeta, RunOrder, RunPart,
53};
54use crate::internal::trace::{
55    ActiveCompaction, ApplyMergeResult, CompactionInput, FueledMergeRes, IdHollowBatch, SpineId,
56    id_range,
57};
58use crate::iter::{Consolidator, StructuredSort};
59use crate::{Metrics, PersistConfig, ShardId};
60
61/// A request for compaction.
62///
63/// This is similar to FueledMergeReq, but intentionally a different type. If we
64/// move compaction to an rpc server, this one will become a protobuf; the type
65/// parameters will become names of codecs to look up in some registry.
66#[derive(Debug, Clone)]
67pub struct CompactReq<T> {
68    /// The shard the input and output batches belong to.
69    pub shard_id: ShardId,
70    /// A description for the output batch.
71    pub desc: Description<T>,
72    /// The updates to include in the output batch. Any data in these outside of
73    /// the output descriptions bounds should be ignored.
74    pub inputs: Vec<IdHollowBatch<T>>,
75}
76
77/// A response from compaction.
78#[derive(Debug)]
79pub struct CompactRes<T> {
80    /// The compacted batch.
81    pub output: HollowBatch<T>,
82    /// The runs that were compacted together to produce the output batch.
83    pub input: CompactionInput,
84}
85
86/// A location in a spine, used to identify the Run that a batch belongs to.
87/// If the Run is not known, the RunId is None.
88#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
89pub struct RunLocation(pub SpineId, pub Option<RunId>);
90
91/// A snapshot of dynamic configs to make it easier to reason about an
92/// individual run of compaction.
93#[derive(Debug, Clone)]
94pub struct CompactConfig {
95    pub(crate) compaction_memory_bound_bytes: usize,
96    pub(crate) compaction_yield_after_n_updates: usize,
97    pub(crate) incremental_compaction: bool,
98    pub(crate) version: semver::Version,
99    pub(crate) batch: BatchBuilderConfig,
100    pub(crate) fetch_config: FetchConfig,
101    pub(crate) now: NowFn,
102}
103
104impl CompactConfig {
105    /// Initialize the compaction config from Persist configuration.
106    pub fn new(value: &PersistConfig, shard_id: ShardId) -> Self {
107        CompactConfig {
108            compaction_memory_bound_bytes: COMPACTION_MEMORY_BOUND_BYTES.get(value),
109            compaction_yield_after_n_updates: value.compaction_yield_after_n_updates,
110            incremental_compaction: ENABLE_INCREMENTAL_COMPACTION.get(value),
111            version: value.build_version.clone(),
112            batch: BatchBuilderConfig::new(value, shard_id),
113            fetch_config: FetchConfig::from_persist_config(value),
114            now: value.now.clone(),
115        }
116    }
117}
118
119/// A service for performing physical and logical compaction.
120///
121/// This will possibly be called over RPC in the future. Physical compaction is
122/// merging adjacent batches. Logical compaction is advancing timestamps to a
123/// new since and consolidating the resulting updates.
124#[derive(Debug)]
125pub struct Compactor<K, V, T, D> {
126    cfg: PersistConfig,
127    metrics: Arc<Metrics>,
128    sender: Sender<(
129        Instant,
130        CompactReq<T>,
131        Machine<K, V, T, D>,
132        oneshot::Sender<Result<(), anyhow::Error>>,
133    )>,
134    _phantom: PhantomData<fn() -> D>,
135}
136
137impl<K, V, T, D> Clone for Compactor<K, V, T, D> {
138    fn clone(&self) -> Self {
139        Compactor {
140            cfg: self.cfg.clone(),
141            metrics: Arc::clone(&self.metrics),
142            sender: self.sender.clone(),
143            _phantom: Default::default(),
144        }
145    }
146}
147
148/// In Compactor::compact_and_apply_background, the minimum amount of time to
149/// allow a compaction request to run before timing it out. A request may be
150/// given a timeout greater than this value depending on the inputs' size
151pub(crate) const COMPACTION_MINIMUM_TIMEOUT: Config<Duration> = Config::new(
152    "persist_compaction_minimum_timeout",
153    Duration::from_secs(90),
154    "\
155    The minimum amount of time to allow a persist compaction request to run \
156    before timing it out (Materialize).",
157);
158
159pub(crate) const COMPACTION_USE_MOST_RECENT_SCHEMA: Config<bool> = Config::new(
160    "persist_compaction_use_most_recent_schema",
161    true,
162    "\
163    Use the most recent schema from all the Runs that are currently being \
164    compacted, instead of the schema on the current write handle (Materialize).
165    ",
166);
167
168pub(crate) const COMPACTION_CHECK_PROCESS_FLAG: Config<bool> = Config::new(
169    "persist_compaction_check_process_flag",
170    true,
171    "Whether Compactor will obey the process_requests flag in PersistConfig, \
172        which allows dynamically disabling compaction. If false, all compaction requests will be processed.",
173);
174
175/// Create a `[CompactionInput::IdRange]` from a set of `SpineId`s.
176fn input_id_range(ids: BTreeSet<SpineId>) -> CompactionInput {
177    let id = id_range(ids);
178
179    CompactionInput::IdRange(id)
180}
181
182impl<K, V, T, D> Compactor<K, V, T, D>
183where
184    K: Debug + Codec,
185    V: Debug + Codec,
186    T: Timestamp + Lattice + Codec64 + Sync,
187    D: Semigroup + Ord + Codec64 + Send + Sync,
188{
189    pub fn new(
190        cfg: PersistConfig,
191        metrics: Arc<Metrics>,
192        write_schemas: Schemas<K, V>,
193        gc: GarbageCollector<K, V, T, D>,
194    ) -> Self {
195        let (compact_req_sender, mut compact_req_receiver) = mpsc::channel::<(
196            Instant,
197            CompactReq<T>,
198            Machine<K, V, T, D>,
199            oneshot::Sender<Result<(), anyhow::Error>>,
200        )>(cfg.compaction_queue_size);
201        let concurrency_limit = Arc::new(tokio::sync::Semaphore::new(
202            cfg.compaction_concurrency_limit,
203        ));
204        let check_process_requests = COMPACTION_CHECK_PROCESS_FLAG.handle(&cfg.configs);
205        let process_requests = Arc::clone(&cfg.compaction_process_requests);
206
207        // spin off a single task responsible for executing compaction requests.
208        // work is enqueued into the task through a channel
209        let _worker_handle = mz_ore::task::spawn(|| "PersistCompactionScheduler", async move {
210            while let Some((enqueued, req, machine, completer)) = compact_req_receiver.recv().await
211            {
212                assert_eq!(req.shard_id, machine.shard_id());
213                let metrics = Arc::clone(&machine.applier.metrics);
214
215                // Only allow skipping compaction requests if the dyncfg is enabled.
216                if check_process_requests.get()
217                    && !process_requests.load(std::sync::atomic::Ordering::Relaxed)
218                {
219                    // Respond to the requester, track in our metrics, and log
220                    // that compaction is disabled.
221                    let _ = completer.send(Err(anyhow::anyhow!("compaction disabled")));
222                    metrics.compaction.disabled.inc();
223                    tracing::warn!(shard_id = ?req.shard_id, "Dropping compaction request on the floor.");
224
225                    continue;
226                }
227
228                let permit = {
229                    let inner = Arc::clone(&concurrency_limit);
230                    // perform a non-blocking attempt to acquire a permit so we can
231                    // record how often we're ever blocked on the concurrency limit
232                    match inner.try_acquire_owned() {
233                        Ok(permit) => permit,
234                        Err(TryAcquireError::NoPermits) => {
235                            metrics.compaction.concurrency_waits.inc();
236                            Arc::clone(&concurrency_limit)
237                                .acquire_owned()
238                                .await
239                                .expect("semaphore is never closed")
240                        }
241                        Err(TryAcquireError::Closed) => {
242                            // should never happen in practice. the semaphore is
243                            // never explicitly closed, nor will it close on Drop
244                            warn!("semaphore for shard {} is closed", machine.shard_id());
245                            continue;
246                        }
247                    }
248                };
249                metrics
250                    .compaction
251                    .queued_seconds
252                    .inc_by(enqueued.elapsed().as_secs_f64());
253
254                let write_schemas = write_schemas.clone();
255
256                let compact_span =
257                    debug_span!(parent: None, "compact::apply", shard_id=%machine.shard_id());
258                compact_span.follows_from(&Span::current());
259                let gc = gc.clone();
260                mz_ore::task::spawn(|| "PersistCompactionWorker", async move {
261                    let res = Self::compact_and_apply(&machine, req, write_schemas)
262                        .instrument(compact_span)
263                        .await;
264                    if let Ok(maintenance) = res {
265                        maintenance.start_performing(&machine, &gc);
266                    }
267
268                    // we can safely ignore errors here, it's possible the caller
269                    // wasn't interested in waiting and dropped their receiver
270                    let _ = completer.send(Ok(()));
271
272                    // moves `permit` into async scope so it can be dropped upon completion
273                    drop(permit);
274                });
275            }
276        });
277
278        Compactor {
279            cfg,
280            metrics,
281            sender: compact_req_sender,
282            _phantom: PhantomData,
283        }
284    }
285
286    /// Enqueues a [CompactReq] to be consumed by the compaction background task when available.
287    ///
288    /// Returns a receiver that indicates when compaction has completed. The receiver can be
289    /// safely dropped at any time if the caller does not wish to wait on completion.
290    pub fn compact_and_apply_background(
291        &self,
292        req: CompactReq<T>,
293        machine: &Machine<K, V, T, D>,
294    ) -> Option<oneshot::Receiver<Result<(), anyhow::Error>>> {
295        // Run some initial heuristics to ignore some requests for compaction.
296        // We don't gain much from e.g. compacting two very small batches that
297        // were just written, but it does result in non-trivial blob traffic
298        // (especially in aggregate). This heuristic is something we'll need to
299        // tune over time.
300        let should_compact = req.inputs.len() >= COMPACTION_HEURISTIC_MIN_INPUTS.get(&self.cfg)
301            || req
302                .inputs
303                .iter()
304                .map(|x| x.batch.part_count())
305                .sum::<usize>()
306                >= COMPACTION_HEURISTIC_MIN_PARTS.get(&self.cfg)
307            || req.inputs.iter().map(|x| x.batch.len).sum::<usize>()
308                >= COMPACTION_HEURISTIC_MIN_UPDATES.get(&self.cfg);
309        if !should_compact {
310            self.metrics.compaction.skipped.inc();
311            return None;
312        }
313
314        let (compaction_completed_sender, compaction_completed_receiver) = oneshot::channel();
315        let new_compaction_sender = self.sender.clone();
316
317        self.metrics.compaction.requested.inc();
318        // NB: we intentionally pass along the input machine, as it ought to come from the
319        // writer that generated the compaction request / maintenance. this machine has a
320        // spine structure that generated the request, so it has a much better chance of
321        // merging and committing the result than a machine kept up-to-date through state
322        // diffs, which may have a different spine structure less amenable to merging.
323        let send = new_compaction_sender.try_send((
324            Instant::now(),
325            req,
326            machine.clone(),
327            compaction_completed_sender,
328        ));
329        if let Err(_) = send {
330            self.metrics.compaction.dropped.inc();
331            return None;
332        }
333
334        Some(compaction_completed_receiver)
335    }
336
337    pub(crate) async fn compact_and_apply(
338        machine: &Machine<K, V, T, D>,
339        req: CompactReq<T>,
340        write_schemas: Schemas<K, V>,
341    ) -> Result<RoutineMaintenance, anyhow::Error> {
342        let metrics = Arc::clone(&machine.applier.metrics);
343        metrics.compaction.started.inc();
344        let start = Instant::now();
345
346        // pick a timeout for our compaction request proportional to the amount
347        // of data that must be read (with a minimum set by PersistConfig)
348        let total_input_bytes = req
349            .inputs
350            .iter()
351            .map(|batch| batch.batch.encoded_size_bytes())
352            .sum::<usize>();
353        let timeout = Duration::max(
354            // either our minimum timeout
355            COMPACTION_MINIMUM_TIMEOUT.get(&machine.applier.cfg),
356            // or 1s per MB of input data
357            Duration::from_secs(u64::cast_from(total_input_bytes / MiB)),
358        );
359        // always use most recent schema from all the Runs we're compacting to prevent Compactors
360        // created before the schema was evolved, from trying to "de-evolve" a Part.
361        let compaction_schema_id = req
362            .inputs
363            .iter()
364            .flat_map(|batch| batch.batch.run_meta.iter())
365            .filter_map(|run_meta| run_meta.schema)
366            // It's an invariant that SchemaIds are ordered.
367            .max();
368        let maybe_compaction_schema = match compaction_schema_id {
369            Some(id) => machine
370                .get_schema(id)
371                .map(|(key_schema, val_schema)| (id, key_schema, val_schema)),
372            None => None,
373        };
374        let use_most_recent_schema = COMPACTION_USE_MOST_RECENT_SCHEMA.get(&machine.applier.cfg);
375
376        let compaction_schema = match maybe_compaction_schema {
377            Some((id, key_schema, val_schema)) if use_most_recent_schema => {
378                metrics.compaction.schema_selection.recent_schema.inc();
379                Schemas {
380                    id: Some(id),
381                    key: Arc::new(key_schema),
382                    val: Arc::new(val_schema),
383                }
384            }
385            Some(_) => {
386                metrics.compaction.schema_selection.disabled.inc();
387                write_schemas
388            }
389            None => {
390                metrics.compaction.schema_selection.no_schema.inc();
391                write_schemas
392            }
393        };
394
395        trace!(
396            "compaction request for {}MBs ({} bytes), with timeout of {}s, and schema {:?}.",
397            total_input_bytes / MiB,
398            total_input_bytes,
399            timeout.as_secs_f64(),
400            compaction_schema.id,
401        );
402
403        let isolated_runtime = Arc::clone(&machine.isolated_runtime);
404        let machine_clone = machine.clone();
405        let metrics_clone = Arc::clone(&machine.applier.metrics);
406        let compact_span = debug_span!("compact::consolidate");
407        let res = tokio::time::timeout(
408            timeout,
409            // Compaction is cpu intensive, so be polite and spawn it on the isolated runtime.
410            isolated_runtime.spawn_named(
411                || "persist::compact::consolidate",
412                async move {
413                    // If the batches we are compacting are written with old versions of persist,
414                    // we may not have run UUIDs for them, meaning we don't have enough info to
415                    // safely compact them incrementally.
416                    let all_runs_have_uuids = req
417                        .inputs
418                        .iter()
419                        .all(|x| x.batch.runs().all(|(meta, _)| meta.id.is_some()));
420                    let all_runs_have_len = req
421                        .inputs
422                        .iter()
423                        .all(|x| x.batch.runs().all(|(meta, _)| meta.len.is_some()));
424
425                    let incremental_enabled = ENABLE_INCREMENTAL_COMPACTION
426                        .get(&machine_clone.applier.cfg)
427                        && all_runs_have_uuids
428                        && all_runs_have_len;
429                    let stream = Self::compact_stream(
430                        CompactConfig::new(&machine_clone.applier.cfg, machine_clone.shard_id()),
431                        Arc::clone(&machine_clone.applier.state_versions.blob),
432                        Arc::clone(&metrics_clone),
433                        Arc::clone(&machine_clone.applier.shard_metrics),
434                        Arc::clone(&machine_clone.isolated_runtime),
435                        req.clone(),
436                        compaction_schema,
437                        incremental_enabled,
438                    );
439
440                    let maintenance = if incremental_enabled {
441                        let mut maintenance = RoutineMaintenance::default();
442                        pin_mut!(stream);
443                        while let Some(res) = stream.next().await {
444                            let res = res?;
445                            let new_maintenance =
446                                Self::apply(res, &metrics_clone, &machine_clone).await?;
447                            maintenance.merge(new_maintenance);
448                        }
449                        maintenance
450                    } else {
451                        let res = Self::compact_all(stream, req.clone()).await?;
452                        Self::apply(
453                            FueledMergeRes {
454                                output: res.output,
455                                input: res.input,
456                                new_active_compaction: None,
457                            },
458                            &metrics_clone,
459                            &machine_clone,
460                        )
461                        .await?
462                    };
463
464                    Ok::<_, anyhow::Error>(maintenance)
465                }
466                .instrument(compact_span),
467            ),
468        )
469        .await;
470
471        metrics
472            .compaction
473            .seconds
474            .inc_by(start.elapsed().as_secs_f64());
475        let res = res
476            .map_err(|e| {
477                metrics.compaction.timed_out.inc();
478                anyhow!(
479                    "compaction timed out after {}s: {}",
480                    timeout.as_secs_f64(),
481                    e
482                )
483            })?
484            .map_err(|e| anyhow!(e))?;
485
486        match res {
487            Ok(maintenance) => Ok(maintenance),
488            Err(err) => {
489                metrics.compaction.failed.inc();
490                debug!(
491                    "compaction for {} failed: {}",
492                    machine.shard_id(),
493                    err.display_with_causes()
494                );
495                Err(err)
496            }
497        }
498    }
499
500    pub async fn compact_all(
501        stream: impl Stream<Item = Result<FueledMergeRes<T>, anyhow::Error>>,
502        req: CompactReq<T>,
503    ) -> Result<CompactRes<T>, anyhow::Error> {
504        pin_mut!(stream);
505
506        let mut all_parts = vec![];
507        let mut all_run_splits = vec![];
508        let mut all_run_meta = vec![];
509        let mut len = 0;
510
511        while let Some(res) = stream.next().await {
512            let res = res?.output;
513            let (parts, updates, run_meta, run_splits) =
514                (res.parts, res.len, res.run_meta, res.run_splits);
515
516            if updates == 0 {
517                continue;
518            }
519
520            let run_offset = all_parts.len();
521            if !all_parts.is_empty() {
522                all_run_splits.push(run_offset);
523            }
524            all_run_splits.extend(run_splits.iter().map(|r| r + run_offset));
525            all_run_meta.extend(run_meta);
526            all_parts.extend(parts);
527            len += updates;
528        }
529
530        let batches = req.inputs.iter().map(|x| x.id).collect::<BTreeSet<_>>();
531        let input = input_id_range(batches);
532
533        Ok(CompactRes {
534            output: HollowBatch::new(
535                req.desc.clone(),
536                all_parts,
537                len,
538                all_run_meta,
539                all_run_splits,
540            ),
541            input,
542        })
543    }
544
545    pub async fn apply(
546        res: FueledMergeRes<T>,
547        metrics: &Metrics,
548        machine: &Machine<K, V, T, D>,
549    ) -> Result<RoutineMaintenance, anyhow::Error> {
550        let (apply_merge_result, maintenance) = machine.merge_res(&res).await;
551
552        match &apply_merge_result {
553            ApplyMergeResult::AppliedExact => {
554                metrics.compaction.applied.inc();
555                metrics.compaction.applied_exact_match.inc();
556                machine.applier.shard_metrics.compaction_applied.inc();
557            }
558            ApplyMergeResult::AppliedSubset => {
559                metrics.compaction.applied.inc();
560                metrics.compaction.applied_subset_match.inc();
561                machine.applier.shard_metrics.compaction_applied.inc();
562            }
563            ApplyMergeResult::NotAppliedNoMatch
564            | ApplyMergeResult::NotAppliedInvalidSince
565            | ApplyMergeResult::NotAppliedTooManyUpdates => {
566                if let ApplyMergeResult::NotAppliedTooManyUpdates = &apply_merge_result {
567                    metrics.compaction.not_applied_too_many_updates.inc();
568                }
569                metrics.compaction.noop.inc();
570                let mut part_deletes = PartDeletes::default();
571                for part in &res.output.parts {
572                    part_deletes.add(part);
573                }
574                part_deletes
575                    .delete(
576                        machine.applier.state_versions.blob.as_ref(),
577                        machine.shard_id(),
578                        GC_BLOB_DELETE_CONCURRENCY_LIMIT.get(&machine.applier.cfg),
579                        &*metrics,
580                        &metrics.retries.external.compaction_noop_delete,
581                    )
582                    .await;
583            }
584        };
585
586        Ok(maintenance)
587    }
588
589    /// Compacts input batches in bounded memory.
590    ///
591    /// The memory bound is broken into pieces:
592    ///     1. in-progress work
593    ///     2. fetching parts from runs
594    ///     3. additional in-flight requests to Blob
595    ///
596    /// 1. In-progress work is bounded by 2 * [BatchBuilderConfig::blob_target_size]. This
597    ///    usage is met at two mutually exclusive moments:
598    ///   * When reading in a part, we hold the columnar format in memory while writing its
599    ///     contents into a heap.
600    ///   * When writing a part, we hold a temporary updates buffer while encoding/writing
601    ///     it into a columnar format for Blob.
602    ///
603    /// 2. When compacting runs, only 1 part from each one is held in memory at a time.
604    ///    Compaction will determine an appropriate number of runs to compact together
605    ///    given the memory bound and accounting for the reservation in (1). A minimum
606    ///    of 2 * [BatchBuilderConfig::blob_target_size] of memory is expected, to be
607    ///    able to at least have the capacity to compact two runs together at a time,
608    ///    and more runs will be compacted together if more memory is available.
609    ///
610    /// 3. If there is excess memory after accounting for (1) and (2), we increase the
611    ///    number of outstanding parts we can keep in-flight to Blob.
612    pub fn compact_stream(
613        cfg: CompactConfig,
614        blob: Arc<dyn Blob>,
615        metrics: Arc<Metrics>,
616        shard_metrics: Arc<ShardMetrics>,
617        isolated_runtime: Arc<IsolatedRuntime>,
618        req: CompactReq<T>,
619        write_schemas: Schemas<K, V>,
620        incremental_enabled: bool,
621    ) -> impl Stream<Item = Result<FueledMergeRes<T>, anyhow::Error>> {
622        async_stream::stream! {
623            let () = Self::validate_req(&req)?;
624
625            // We introduced a fast-path optimization in https://github.com/MaterializeInc/materialize/pull/15363
626            // but had to revert it due to a very scary bug. Here we count how many of our compaction reqs
627            // could be eligible for the optimization to better understand whether it's worth trying to
628            // reintroduce it.
629            let mut single_nonempty_batch = None;
630            for batch in &req.inputs {
631                if batch.batch.len > 0 {
632                    match single_nonempty_batch {
633                        None => single_nonempty_batch = Some(batch),
634                        Some(_previous_nonempty_batch) => {
635                            single_nonempty_batch = None;
636                            break;
637                        }
638                    }
639                }
640            }
641            if let Some(single_nonempty_batch) = single_nonempty_batch {
642                if single_nonempty_batch.batch.run_splits.len() == 0
643                    && single_nonempty_batch.batch.desc.since() != &Antichain::from_elem(T::minimum())
644                {
645                    metrics.compaction.fast_path_eligible.inc();
646                }
647            }
648
649            // Reserve space for the in-progress part to be held in-mem representation and columnar -
650            let in_progress_part_reserved_memory_bytes = 2 * cfg.batch.blob_target_size;
651            // - then remaining memory will go towards pulling down as many runs as we can.
652            // We'll always do at least two runs per chunk, which means we may go over this limit
653            // if parts are large or the limit is low... though we do at least increment a metric
654            // when that happens.
655            let run_reserved_memory_bytes = cfg
656                .compaction_memory_bound_bytes
657                .saturating_sub(in_progress_part_reserved_memory_bytes);
658
659            let ordered_runs =
660                Self::flatten_runs(&req, cfg.batch.preferred_order, &*blob, &*metrics).await?;
661
662            let chunked_runs = Self::chunk_runs(
663                &ordered_runs,
664                &cfg,
665                &*metrics,
666                run_reserved_memory_bytes,
667            );
668            let total_chunked_runs = chunked_runs.len();
669
670            for (applied, (runs, run_chunk_max_memory_usage)) in
671                chunked_runs.into_iter().enumerate()
672            {
673                metrics.compaction.chunks_compacted.inc();
674                metrics
675                    .compaction
676                    .runs_compacted
677                    .inc_by(u64::cast_from(runs.len()));
678
679                // given the runs we actually have in our batch, we might have extra memory
680                // available. we reserved enough space to always have 1 in-progress part in
681                // flight, but if we have excess, we can use it to increase our write parallelism
682                let extra_outstanding_parts = (run_reserved_memory_bytes
683                    .saturating_sub(run_chunk_max_memory_usage))
684                    / cfg.batch.blob_target_size;
685                let mut run_cfg = cfg.clone();
686                run_cfg.batch.batch_builder_max_outstanding_parts = 1 + extra_outstanding_parts;
687
688                let (batch_ids, descriptions): (BTreeSet<_>, Vec<_>) = runs.iter()
689                    .map(|(run_id, desc, _, _)| (run_id.0, *desc))
690                    .unzip();
691
692                let input = if incremental_enabled {
693                    let run_ids = runs.iter()
694                        .map(|(run_id, _, _, _)| run_id.1.expect("run_id should be present"))
695                        .collect::<BTreeSet<_>>();
696                    match batch_ids.iter().exactly_one().ok() {
697                        Some(batch_id) => {
698                            CompactionInput::PartialBatch(
699                                *batch_id,
700                                run_ids
701                            )
702                        }
703                        None => input_id_range(batch_ids),
704                    }
705                } else {
706                    input_id_range(batch_ids)
707                };
708
709                let desc = if incremental_enabled {
710                    let desc_lower = descriptions
711                        .iter()
712                        .map(|desc| desc.lower())
713                        .cloned()
714                        .reduce(|a, b| a.meet(&b))
715                        .unwrap_or_else(|| req.desc.lower().clone());
716
717                    let desc_upper = descriptions
718                        .iter()
719                        .map(|desc| desc.upper())
720                        .cloned()
721                        .reduce(|a, b| a.join(&b))
722                        .unwrap_or_else(|| req.desc.upper().clone());
723
724                    Description::new(desc_lower, desc_upper, req.desc.since().clone())
725                } else {
726                    req.desc.clone()
727                };
728
729                let runs = runs.iter()
730                    .map(|(_, desc, meta, run)| (*desc, *meta, *run))
731                    .collect::<Vec<_>>();
732
733                let batch = Self::compact_runs(
734                    &run_cfg,
735                    &req.shard_id,
736                    &desc,
737                    runs,
738                    Arc::clone(&blob),
739                    Arc::clone(&metrics),
740                    Arc::clone(&shard_metrics),
741                    Arc::clone(&isolated_runtime),
742                    write_schemas.clone(),
743                )
744                .await?;
745
746                assert!(
747                    (batch.len == 0 && batch.parts.len() == 0) || (batch.len > 0 && batch.parts.len() > 0),
748                    "updates={}, parts={}",
749                    batch.len,
750                    batch.parts.len(),
751                );
752
753                // Set up active compaction metadata
754                let clock = cfg.now.clone();
755                let active_compaction = if applied < total_chunked_runs - 1 {
756                    Some(ActiveCompaction { start_ms: clock() })
757                } else {
758                    None
759                };
760
761                let res = CompactRes {
762                    output: batch,
763                    input,
764                };
765
766                let res = FueledMergeRes {
767                    output: res.output,
768                    new_active_compaction: active_compaction,
769                    input: res.input,
770                };
771
772                yield Ok(res);
773            }
774        }
775    }
776
777    /// Compacts the input batches together, returning a single compacted batch.
778    /// Under the hood this just calls [Self::compact_stream] and
779    /// [Self::compact_all], but it is a convenience method that allows
780    /// the caller to not have to deal with the streaming API.
781    pub async fn compact(
782        cfg: CompactConfig,
783        blob: Arc<dyn Blob>,
784        metrics: Arc<Metrics>,
785        shard_metrics: Arc<ShardMetrics>,
786        isolated_runtime: Arc<IsolatedRuntime>,
787        req: CompactReq<T>,
788        write_schemas: Schemas<K, V>,
789    ) -> Result<CompactRes<T>, anyhow::Error> {
790        let stream = Self::compact_stream(
791            cfg,
792            Arc::clone(&blob),
793            Arc::clone(&metrics),
794            Arc::clone(&shard_metrics),
795            Arc::clone(&isolated_runtime),
796            req.clone(),
797            write_schemas,
798            false,
799        );
800
801        Self::compact_all(stream, req).await
802    }
803
804    /// Chunks runs with the following rules:
805    /// 1. Runs from multiple batches are allowed to be mixed as long as _every_ run in the
806    ///    the batch is present in the chunk.
807    /// 2. Otherwise runs are split into chunks of runs from a single batch.
808    fn chunk_runs<'a>(
809        ordered_runs: &'a [(
810            RunLocation,
811            &'a Description<T>,
812            &'a RunMeta,
813            Cow<'a, [RunPart<T>]>,
814        )],
815        cfg: &CompactConfig,
816        metrics: &Metrics,
817        run_reserved_memory_bytes: usize,
818    ) -> Vec<(
819        Vec<(
820            &'a RunLocation,
821            &'a Description<T>,
822            &'a RunMeta,
823            &'a [RunPart<T>],
824        )>,
825        usize,
826    )> {
827        // Group runs by SpineId
828        let grouped: BTreeMap<SpineId, Vec<_>> = ordered_runs
829            .iter()
830            .map(|(run_id, desc, meta, parts)| (run_id.0, (run_id, *desc, *meta, &**parts)))
831            .fold(BTreeMap::new(), |mut acc, item| {
832                acc.entry(item.0).or_default().push(item.1);
833                acc
834            });
835
836        let mut grouped = grouped.into_iter().peekable();
837
838        let mut chunks = vec![];
839        let mut current_chunk = vec![];
840        let mut current_chunk_max_memory_usage = 0;
841        let mut mem_violation = false;
842
843        fn max_part_bytes<T>(parts: &[RunPart<T>], cfg: &CompactConfig) -> usize {
844            parts
845                .iter()
846                .map(|p| p.max_part_bytes())
847                .max()
848                .unwrap_or(cfg.batch.blob_target_size)
849        }
850
851        while let Some((_spine_id, runs)) = grouped.next() {
852            let batch_size = runs
853                .iter()
854                .map(|(_, _, _, parts)| max_part_bytes(parts, cfg))
855                .sum::<usize>();
856
857            let num_runs = runs.len();
858
859            // Determine if adding this batch poses a memory violation risk.
860            // If we have a single run which is greater than the reserved memory, we need to be careful.
861            // We need to force it to be merged with another batch (if present)
862            // or else compaction won't make progress.
863            let memory_violation_risk =
864                batch_size > run_reserved_memory_bytes && num_runs == 1 && current_chunk.is_empty();
865
866            if mem_violation && num_runs == 1 {
867                // After a memory violation, combine the next single run (if present)
868                // into the chunk, forcing us to make progress.
869                current_chunk.extend(runs.clone());
870                current_chunk_max_memory_usage += batch_size;
871                mem_violation = false;
872                continue;
873            } else if current_chunk_max_memory_usage + batch_size <= run_reserved_memory_bytes {
874                // If the whole batch fits into the current mixed-batch chunk, add it and consider continuing to mix.
875                current_chunk.extend(runs);
876                current_chunk_max_memory_usage += batch_size;
877                continue;
878            } else if memory_violation_risk {
879                // If adding this single-run batch would cause a memory violation, add it anyway.
880                metrics.compaction.memory_violations.inc();
881                mem_violation = true;
882                current_chunk.extend(runs.clone());
883                current_chunk_max_memory_usage += batch_size;
884                continue;
885            }
886
887            // Otherwise, we cannot mix this batch partially. Flush any existing mixed chunk first.
888            if !current_chunk.is_empty() {
889                chunks.push((
890                    std::mem::take(&mut current_chunk),
891                    current_chunk_max_memory_usage,
892                ));
893                mem_violation = false;
894                current_chunk_max_memory_usage = 0;
895            }
896
897            // Now process this batch alone, splitting into single-batch chunks as needed.
898            let mut run_iter = runs.into_iter().peekable();
899            debug_assert!(current_chunk.is_empty());
900            debug_assert_eq!(current_chunk_max_memory_usage, 0);
901
902            while let Some((run_id, desc, meta, parts)) = run_iter.next() {
903                let run_size = max_part_bytes(parts, cfg);
904                current_chunk.push((run_id, desc, meta, parts));
905                current_chunk_max_memory_usage += run_size;
906
907                if let Some((_, _, _, next_parts)) = run_iter.peek() {
908                    let next_size = max_part_bytes(next_parts, cfg);
909                    if current_chunk_max_memory_usage + next_size > run_reserved_memory_bytes {
910                        // If the current chunk only has one run, record a memory violation metric.
911                        if current_chunk.len() == 1 {
912                            metrics.compaction.memory_violations.inc();
913                            continue;
914                        }
915                        // Flush the current chunk and start a new one.
916                        chunks.push((
917                            std::mem::take(&mut current_chunk),
918                            current_chunk_max_memory_usage,
919                        ));
920                        current_chunk_max_memory_usage = 0;
921                    }
922                }
923            }
924
925            if !current_chunk.is_empty() {
926                chunks.push((
927                    std::mem::take(&mut current_chunk),
928                    current_chunk_max_memory_usage,
929                ));
930                current_chunk_max_memory_usage = 0;
931            }
932        }
933
934        // If we ended with a mixed-batch chunk in progress, flush it.
935        if !current_chunk.is_empty() {
936            chunks.push((current_chunk, current_chunk_max_memory_usage));
937        }
938
939        chunks
940    }
941
942    /// Flattens the runs in the input batches into a single ordered list of runs.
943    async fn flatten_runs<'a>(
944        req: &'a CompactReq<T>,
945        target_order: RunOrder,
946        blob: &'a dyn Blob,
947        metrics: &'a Metrics,
948    ) -> anyhow::Result<
949        Vec<(
950            RunLocation,
951            &'a Description<T>,
952            &'a RunMeta,
953            Cow<'a, [RunPart<T>]>,
954        )>,
955    > {
956        let total_number_of_runs = req
957            .inputs
958            .iter()
959            .map(|x| x.batch.run_splits.len() + 1)
960            .sum::<usize>();
961
962        let batch_runs: Vec<_> = req
963            .inputs
964            .iter()
965            .map(|x| (x.id, &x.batch.desc, x.batch.runs()))
966            .collect();
967
968        let mut ordered_runs = Vec::with_capacity(total_number_of_runs);
969        for (spine_id, desc, runs) in batch_runs {
970            for (meta, run) in runs {
971                let run_id = RunLocation(spine_id, meta.id);
972                let same_order = meta.order.unwrap_or(RunOrder::Codec) == target_order;
973                if same_order {
974                    ordered_runs.push((run_id, desc, meta, Cow::Borrowed(run)));
975                } else {
976                    // The downstream consolidation step will handle a long run that's not in
977                    // the desired order by splitting it up into many single-element runs. This preserves
978                    // correctness, but it means that we may end up needing to iterate through
979                    // many more parts concurrently than expected, increasing memory use. Instead,
980                    // we break up those runs into individual batch parts, fetching hollow runs as
981                    // necessary, before they're grouped together to be passed to consolidation.
982                    // The downside is that this breaks the usual property that compaction produces
983                    // fewer runs than it takes in. This should generally be resolved by future
984                    // runs of compaction.
985                    for part in run {
986                        let mut batch_parts = pin!(part.part_stream(req.shard_id, blob, metrics));
987                        while let Some(part) = batch_parts.next().await {
988                            ordered_runs.push((
989                                run_id,
990                                desc,
991                                meta,
992                                Cow::Owned(vec![RunPart::Single(part?.into_owned())]),
993                            ));
994                        }
995                    }
996                }
997            }
998        }
999
1000        Ok(ordered_runs)
1001    }
1002
1003    /// Compacts runs together. If the input runs are sorted, a single run will be created as output.
1004    ///
1005    /// Maximum possible memory usage is `(# runs + 2) * [crate::PersistConfig::blob_target_size]`
1006    pub(crate) async fn compact_runs(
1007        cfg: &CompactConfig,
1008        shard_id: &ShardId,
1009        desc: &Description<T>,
1010        runs: Vec<(&Description<T>, &RunMeta, &[RunPart<T>])>,
1011        blob: Arc<dyn Blob>,
1012        metrics: Arc<Metrics>,
1013        shard_metrics: Arc<ShardMetrics>,
1014        isolated_runtime: Arc<IsolatedRuntime>,
1015        write_schemas: Schemas<K, V>,
1016    ) -> Result<HollowBatch<T>, anyhow::Error> {
1017        // TODO: Figure out a more principled way to allocate our memory budget.
1018        // Currently, we give any excess budget to write parallelism. If we had
1019        // to pick between 100% towards writes vs 100% towards reads, then reads
1020        // is almost certainly better, but the ideal is probably somewhere in
1021        // between the two.
1022        //
1023        // For now, invent some some extra budget out of thin air for prefetch.
1024        let prefetch_budget_bytes = 2 * cfg.batch.blob_target_size;
1025
1026        let mut timings = Timings::default();
1027
1028        let mut batch_cfg = cfg.batch.clone();
1029
1030        // Use compaction as a method of getting inline writes out of state, to
1031        // make room for more inline writes. We could instead do this at the end
1032        // of compaction by flushing out the batch, but doing it here based on
1033        // the config allows BatchBuilder to do its normal pipelining of writes.
1034        batch_cfg.inline_writes_single_max_bytes = 0;
1035
1036        let parts = BatchParts::new_ordered::<D>(
1037            batch_cfg,
1038            cfg.batch.preferred_order,
1039            Arc::clone(&metrics),
1040            Arc::clone(&shard_metrics),
1041            *shard_id,
1042            Arc::clone(&blob),
1043            Arc::clone(&isolated_runtime),
1044            &metrics.compaction.batch,
1045        );
1046        let mut batch = BatchBuilderInternal::<K, V, T, D>::new(
1047            cfg.batch.clone(),
1048            parts,
1049            Arc::clone(&metrics),
1050            write_schemas.clone(),
1051            Arc::clone(&blob),
1052            shard_id.clone(),
1053            cfg.version.clone(),
1054        );
1055
1056        let mut consolidator = Consolidator::new(
1057            format!(
1058                "{}[lower={:?},upper={:?}]",
1059                shard_id,
1060                desc.lower().elements(),
1061                desc.upper().elements()
1062            ),
1063            cfg.fetch_config.clone(),
1064            *shard_id,
1065            StructuredSort::<K, V, T, D>::new(write_schemas.clone()),
1066            blob,
1067            Arc::clone(&metrics),
1068            shard_metrics,
1069            metrics.read.compaction.clone(),
1070            FetchBatchFilter::Compaction {
1071                since: desc.since().clone(),
1072            },
1073            None,
1074            prefetch_budget_bytes,
1075        );
1076
1077        for (desc, meta, parts) in runs {
1078            consolidator.enqueue_run(desc, meta, parts.iter().cloned());
1079        }
1080
1081        let remaining_budget = consolidator.start_prefetches();
1082        if remaining_budget.is_none() {
1083            metrics.compaction.not_all_prefetched.inc();
1084        }
1085
1086        loop {
1087            let mut chunks = vec![];
1088            let mut total_bytes = 0;
1089            // We attempt to pull chunks out of the consolidator that match our target size,
1090            // but it's possible that we may get smaller chunks... for example, if not all
1091            // parts have been fetched yet. Loop until we've got enough data to justify flushing
1092            // it out to blob (or we run out of data.)
1093            while total_bytes < cfg.batch.blob_target_size {
1094                let fetch_start = Instant::now();
1095                let Some(chunk) = consolidator
1096                    .next_chunk(
1097                        cfg.compaction_yield_after_n_updates,
1098                        cfg.batch.blob_target_size - total_bytes,
1099                    )
1100                    .await?
1101                else {
1102                    break;
1103                };
1104                timings.part_fetching += fetch_start.elapsed();
1105                total_bytes += chunk.goodbytes();
1106                chunks.push(chunk);
1107                tokio::task::yield_now().await;
1108            }
1109
1110            // In the hopefully-common case of a single chunk, this will not copy.
1111            let Some(updates) = Part::concat(&chunks).expect("compaction produces well-typed data")
1112            else {
1113                break;
1114            };
1115            batch.flush_part(desc.clone(), updates).await;
1116        }
1117        let mut batch = batch.finish(desc.clone()).await?;
1118
1119        // We use compaction as a method of getting inline writes out of state,
1120        // to make room for more inline writes. This happens in
1121        // `CompactConfig::new` by overriding the inline writes threshold
1122        // config. This is a bit action-at-a-distance, so defensively detect if
1123        // this breaks here and log and correct it if so.
1124        let has_inline_parts = batch.batch.parts.iter().any(|x| x.is_inline());
1125        if has_inline_parts {
1126            error!(%shard_id, ?cfg, "compaction result unexpectedly had inline writes");
1127            let () = batch
1128                .flush_to_blob(
1129                    &cfg.batch,
1130                    &metrics.compaction.batch,
1131                    &isolated_runtime,
1132                    &write_schemas,
1133                )
1134                .await;
1135        }
1136
1137        timings.record(&metrics);
1138        Ok(batch.into_hollow_batch())
1139    }
1140
1141    fn validate_req(req: &CompactReq<T>) -> Result<(), anyhow::Error> {
1142        let mut frontier = req.desc.lower();
1143        for input in req.inputs.iter() {
1144            if PartialOrder::less_than(req.desc.since(), input.batch.desc.since()) {
1145                return Err(anyhow!(
1146                    "output since {:?} must be at or in advance of input since {:?}",
1147                    req.desc.since(),
1148                    input.batch.desc.since()
1149                ));
1150            }
1151            if frontier != input.batch.desc.lower() {
1152                return Err(anyhow!(
1153                    "invalid merge of non-consecutive batches {:?} vs {:?}",
1154                    frontier,
1155                    input.batch.desc.lower()
1156                ));
1157            }
1158            frontier = input.batch.desc.upper();
1159        }
1160        if frontier != req.desc.upper() {
1161            return Err(anyhow!(
1162                "invalid merge of non-consecutive batches {:?} vs {:?}",
1163                frontier,
1164                req.desc.upper()
1165            ));
1166        }
1167        Ok(())
1168    }
1169}
1170
1171#[derive(Debug, Default)]
1172struct Timings {
1173    part_fetching: Duration,
1174    heap_population: Duration,
1175}
1176
1177impl Timings {
1178    fn record(self, metrics: &Metrics) {
1179        // intentionally deconstruct so we don't forget to consider each field
1180        let Timings {
1181            part_fetching,
1182            heap_population,
1183        } = self;
1184
1185        metrics
1186            .compaction
1187            .steps
1188            .part_fetch_seconds
1189            .inc_by(part_fetching.as_secs_f64());
1190        metrics
1191            .compaction
1192            .steps
1193            .heap_population_seconds
1194            .inc_by(heap_population.as_secs_f64());
1195    }
1196}
1197
1198#[cfg(test)]
1199mod tests {
1200    use mz_dyncfg::ConfigUpdates;
1201    use mz_ore::{assert_contains, assert_err};
1202    use mz_persist_types::codec_impls::StringSchema;
1203    use timely::progress::Antichain;
1204
1205    use crate::PersistLocation;
1206    use crate::batch::BLOB_TARGET_SIZE;
1207    use crate::internal::trace::SpineId;
1208    use crate::tests::{all_ok, expect_fetch_part, new_test_client_cache};
1209
1210    use super::*;
1211
1212    // A regression test for a bug caught during development of materialize#13160 (never
1213    // made it to main) where batches written by compaction would always have a
1214    // since of the minimum timestamp.
1215    #[mz_persist_proc::test(tokio::test)]
1216    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1217    async fn regression_minimum_since(dyncfgs: ConfigUpdates) {
1218        let data = vec![
1219            (("0".to_owned(), "zero".to_owned()), 0, 1),
1220            (("0".to_owned(), "zero".to_owned()), 1, -1),
1221            (("1".to_owned(), "one".to_owned()), 1, 1),
1222        ];
1223
1224        let cache = new_test_client_cache(&dyncfgs);
1225        cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1226        let (mut write, _) = cache
1227            .open(PersistLocation::new_in_mem())
1228            .await
1229            .expect("client construction failed")
1230            .expect_open::<String, String, u64, i64>(ShardId::new())
1231            .await;
1232        let b0 = write
1233            .expect_batch(&data[..1], 0, 1)
1234            .await
1235            .into_hollow_batch();
1236        let b1 = write
1237            .expect_batch(&data[1..], 1, 2)
1238            .await
1239            .into_hollow_batch();
1240
1241        let req = CompactReq {
1242            shard_id: write.machine.shard_id(),
1243            desc: Description::new(
1244                b0.desc.lower().clone(),
1245                b1.desc.upper().clone(),
1246                Antichain::from_elem(10u64),
1247            ),
1248            inputs: vec![
1249                IdHollowBatch {
1250                    batch: Arc::new(b0),
1251                    id: SpineId(0, 1),
1252                },
1253                IdHollowBatch {
1254                    batch: Arc::new(b1),
1255                    id: SpineId(1, 2),
1256                },
1257            ],
1258        };
1259        let schemas = Schemas {
1260            id: None,
1261            key: Arc::new(StringSchema),
1262            val: Arc::new(StringSchema),
1263        };
1264        let res = Compactor::<String, String, u64, i64>::compact(
1265            CompactConfig::new(&write.cfg, write.shard_id()),
1266            Arc::clone(&write.blob),
1267            Arc::clone(&write.metrics),
1268            write.metrics.shards.shard(&write.machine.shard_id(), ""),
1269            Arc::new(IsolatedRuntime::new_for_tests()),
1270            req.clone(),
1271            schemas.clone(),
1272        )
1273        .await
1274        .expect("compaction failed");
1275
1276        assert_eq!(res.output.desc, req.desc);
1277        assert_eq!(res.output.len, 1);
1278        assert_eq!(res.output.part_count(), 1);
1279        let part = res.output.parts[0].expect_hollow_part();
1280        let (part, updates) = expect_fetch_part(
1281            write.blob.as_ref(),
1282            &part.key.complete(&write.machine.shard_id()),
1283            &write.metrics,
1284            &schemas,
1285        )
1286        .await;
1287        assert_eq!(part.desc, res.output.desc);
1288        assert_eq!(updates, all_ok(&data, 10));
1289    }
1290
1291    #[mz_persist_proc::test(tokio::test)]
1292    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1293    async fn disable_compaction(dyncfgs: ConfigUpdates) {
1294        let data = [
1295            (("0".to_owned(), "zero".to_owned()), 0, 1),
1296            (("0".to_owned(), "zero".to_owned()), 1, -1),
1297            (("1".to_owned(), "one".to_owned()), 1, 1),
1298        ];
1299
1300        let cache = new_test_client_cache(&dyncfgs);
1301        cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1302        let (mut write, _) = cache
1303            .open(PersistLocation::new_in_mem())
1304            .await
1305            .expect("client construction failed")
1306            .expect_open::<String, String, u64, i64>(ShardId::new())
1307            .await;
1308        let b0 = write
1309            .expect_batch(&data[..1], 0, 1)
1310            .await
1311            .into_hollow_batch();
1312        let b1 = write
1313            .expect_batch(&data[1..], 1, 2)
1314            .await
1315            .into_hollow_batch();
1316
1317        let req = CompactReq {
1318            shard_id: write.machine.shard_id(),
1319            desc: Description::new(
1320                b0.desc.lower().clone(),
1321                b1.desc.upper().clone(),
1322                Antichain::from_elem(10u64),
1323            ),
1324            inputs: vec![
1325                IdHollowBatch {
1326                    batch: Arc::new(b0),
1327                    id: SpineId(0, 1),
1328                },
1329                IdHollowBatch {
1330                    batch: Arc::new(b1),
1331                    id: SpineId(1, 2),
1332                },
1333            ],
1334        };
1335        write.cfg.set_config(&COMPACTION_HEURISTIC_MIN_INPUTS, 1);
1336        let compactor = write.compact.as_ref().expect("compaction hard disabled");
1337
1338        write.cfg.disable_compaction();
1339        let result = compactor
1340            .compact_and_apply_background(req.clone(), &write.machine)
1341            .expect("listener")
1342            .await
1343            .expect("channel closed");
1344        assert_err!(result);
1345        assert_contains!(result.unwrap_err().to_string(), "compaction disabled");
1346
1347        write.cfg.enable_compaction();
1348        compactor
1349            .compact_and_apply_background(req, &write.machine)
1350            .expect("listener")
1351            .await
1352            .expect("channel closed")
1353            .expect("compaction success");
1354
1355        // Make sure our CYA dyncfg works.
1356        let data2 = [
1357            (("2".to_owned(), "two".to_owned()), 2, 1),
1358            (("2".to_owned(), "two".to_owned()), 3, -1),
1359            (("3".to_owned(), "three".to_owned()), 3, 1),
1360        ];
1361
1362        let b2 = write
1363            .expect_batch(&data2[..1], 2, 3)
1364            .await
1365            .into_hollow_batch();
1366        let b3 = write
1367            .expect_batch(&data2[1..], 3, 4)
1368            .await
1369            .into_hollow_batch();
1370
1371        let req = CompactReq {
1372            shard_id: write.machine.shard_id(),
1373            desc: Description::new(
1374                b2.desc.lower().clone(),
1375                b3.desc.upper().clone(),
1376                Antichain::from_elem(20u64),
1377            ),
1378            inputs: vec![
1379                IdHollowBatch {
1380                    batch: Arc::new(b2),
1381                    id: SpineId(0, 1),
1382                },
1383                IdHollowBatch {
1384                    batch: Arc::new(b3),
1385                    id: SpineId(1, 2),
1386                },
1387            ],
1388        };
1389        let compactor = write.compact.as_ref().expect("compaction hard disabled");
1390
1391        // When the dyncfg is set to false we should ignore the process flag.
1392        write.cfg.set_config(&COMPACTION_CHECK_PROCESS_FLAG, false);
1393        write.cfg.disable_compaction();
1394        // Compaction still succeeded!
1395        compactor
1396            .compact_and_apply_background(req, &write.machine)
1397            .expect("listener")
1398            .await
1399            .expect("channel closed")
1400            .expect("compaction success");
1401    }
1402}