mz_persist_client/internal/
compact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::VecDeque;
12use std::fmt::Debug;
13use std::marker::PhantomData;
14use std::pin::pin;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use anyhow::anyhow;
19use differential_dataflow::difference::Semigroup;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::trace::Description;
22use futures_util::{StreamExt, TryFutureExt};
23use mz_dyncfg::Config;
24use mz_ore::cast::CastFrom;
25use mz_ore::error::ErrorExt;
26use mz_persist::location::Blob;
27use mz_persist_types::part::Part;
28use mz_persist_types::{Codec, Codec64};
29use timely::PartialOrder;
30use timely::progress::{Antichain, Timestamp};
31use tokio::sync::mpsc::Sender;
32use tokio::sync::{TryAcquireError, mpsc, oneshot};
33use tracing::{Instrument, Span, debug, debug_span, error, trace, warn};
34
35use crate::async_runtime::IsolatedRuntime;
36use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, BatchParts, PartDeletes};
37use crate::cfg::{
38    COMPACTION_HEURISTIC_MIN_INPUTS, COMPACTION_HEURISTIC_MIN_PARTS,
39    COMPACTION_HEURISTIC_MIN_UPDATES, COMPACTION_MEMORY_BOUND_BYTES,
40    GC_BLOB_DELETE_CONCURRENCY_LIMIT, MiB,
41};
42use crate::fetch::FetchBatchFilter;
43use crate::internal::encoding::Schemas;
44use crate::internal::gc::GarbageCollector;
45use crate::internal::machine::Machine;
46use crate::internal::maintenance::RoutineMaintenance;
47use crate::internal::metrics::ShardMetrics;
48use crate::internal::state::{HollowBatch, RunMeta, RunOrder, RunPart};
49use crate::internal::trace::{ApplyMergeResult, FueledMergeRes};
50use crate::iter::{Consolidator, StructuredSort};
51use crate::{Metrics, PersistConfig, ShardId};
52
53/// A request for compaction.
54///
55/// This is similar to FueledMergeReq, but intentionally a different type. If we
56/// move compaction to an rpc server, this one will become a protobuf; the type
57/// parameters will become names of codecs to look up in some registry.
58#[derive(Debug, Clone)]
59pub struct CompactReq<T> {
60    /// The shard the input and output batches belong to.
61    pub shard_id: ShardId,
62    /// A description for the output batch.
63    pub desc: Description<T>,
64    /// The updates to include in the output batch. Any data in these outside of
65    /// the output descriptions bounds should be ignored.
66    pub inputs: Vec<HollowBatch<T>>,
67}
68
69/// A response from compaction.
70#[derive(Debug)]
71pub struct CompactRes<T> {
72    /// The compacted batch.
73    pub output: HollowBatch<T>,
74}
75
76/// A snapshot of dynamic configs to make it easier to reason about an
77/// individual run of compaction.
78#[derive(Debug, Clone)]
79pub struct CompactConfig {
80    pub(crate) compaction_memory_bound_bytes: usize,
81    pub(crate) compaction_yield_after_n_updates: usize,
82    pub(crate) version: semver::Version,
83    pub(crate) batch: BatchBuilderConfig,
84}
85
86impl CompactConfig {
87    /// Initialize the compaction config from Persist configuration.
88    pub fn new(value: &PersistConfig, shard_id: ShardId) -> Self {
89        CompactConfig {
90            compaction_memory_bound_bytes: COMPACTION_MEMORY_BOUND_BYTES.get(value),
91            compaction_yield_after_n_updates: value.compaction_yield_after_n_updates,
92            version: value.build_version.clone(),
93            batch: BatchBuilderConfig::new(value, shard_id),
94        }
95    }
96}
97
98/// A service for performing physical and logical compaction.
99///
100/// This will possibly be called over RPC in the future. Physical compaction is
101/// merging adjacent batches. Logical compaction is advancing timestamps to a
102/// new since and consolidating the resulting updates.
103#[derive(Debug)]
104pub struct Compactor<K, V, T, D> {
105    cfg: PersistConfig,
106    metrics: Arc<Metrics>,
107    sender: Sender<(
108        Instant,
109        CompactReq<T>,
110        Machine<K, V, T, D>,
111        oneshot::Sender<Result<ApplyMergeResult, anyhow::Error>>,
112    )>,
113    _phantom: PhantomData<fn() -> D>,
114}
115
116impl<K, V, T, D> Clone for Compactor<K, V, T, D> {
117    fn clone(&self) -> Self {
118        Compactor {
119            cfg: self.cfg.clone(),
120            metrics: Arc::clone(&self.metrics),
121            sender: self.sender.clone(),
122            _phantom: Default::default(),
123        }
124    }
125}
126
127/// In Compactor::compact_and_apply_background, the minimum amount of time to
128/// allow a compaction request to run before timing it out. A request may be
129/// given a timeout greater than this value depending on the inputs' size
130pub(crate) const COMPACTION_MINIMUM_TIMEOUT: Config<Duration> = Config::new(
131    "persist_compaction_minimum_timeout",
132    Duration::from_secs(90),
133    "\
134    The minimum amount of time to allow a persist compaction request to run \
135    before timing it out (Materialize).",
136);
137
138pub(crate) const COMPACTION_USE_MOST_RECENT_SCHEMA: Config<bool> = Config::new(
139    "persist_compaction_use_most_recent_schema",
140    true,
141    "\
142    Use the most recent schema from all the Runs that are currently being \
143    compacted, instead of the schema on the current write handle (Materialize).
144    ",
145);
146
147pub(crate) const COMPACTION_CHECK_PROCESS_FLAG: Config<bool> = Config::new(
148    "persist_compaction_check_process_flag",
149    true,
150    "Whether Compactor will obey the process_requests flag in PersistConfig, \
151        which allows dynamically disabling compaction. If false, all compaction requests will be processed.",
152);
153
154impl<K, V, T, D> Compactor<K, V, T, D>
155where
156    K: Debug + Codec,
157    V: Debug + Codec,
158    T: Timestamp + Lattice + Codec64 + Sync,
159    D: Semigroup + Ord + Codec64 + Send + Sync,
160{
161    pub fn new(
162        cfg: PersistConfig,
163        metrics: Arc<Metrics>,
164        write_schemas: Schemas<K, V>,
165        gc: GarbageCollector<K, V, T, D>,
166    ) -> Self {
167        let (compact_req_sender, mut compact_req_receiver) = mpsc::channel::<(
168            Instant,
169            CompactReq<T>,
170            Machine<K, V, T, D>,
171            oneshot::Sender<Result<ApplyMergeResult, anyhow::Error>>,
172        )>(cfg.compaction_queue_size);
173        let concurrency_limit = Arc::new(tokio::sync::Semaphore::new(
174            cfg.compaction_concurrency_limit,
175        ));
176        let check_process_requests = COMPACTION_CHECK_PROCESS_FLAG.handle(&cfg.configs);
177        let process_requests = Arc::clone(&cfg.compaction_process_requests);
178
179        // spin off a single task responsible for executing compaction requests.
180        // work is enqueued into the task through a channel
181        let _worker_handle = mz_ore::task::spawn(|| "PersistCompactionScheduler", async move {
182            while let Some((enqueued, req, machine, completer)) = compact_req_receiver.recv().await
183            {
184                assert_eq!(req.shard_id, machine.shard_id());
185                let metrics = Arc::clone(&machine.applier.metrics);
186
187                // Only allow skipping compaction requests if the dyncfg is enabled.
188                if check_process_requests.get()
189                    && !process_requests.load(std::sync::atomic::Ordering::Relaxed)
190                {
191                    // Respond to the requester, track in our metrics, and log
192                    // that compaction is disabled.
193                    let _ = completer.send(Err(anyhow::anyhow!("compaction disabled")));
194                    metrics.compaction.disabled.inc();
195                    tracing::warn!(shard_id = ?req.shard_id, "Dropping compaction request on the floor.");
196
197                    continue;
198                }
199
200                let permit = {
201                    let inner = Arc::clone(&concurrency_limit);
202                    // perform a non-blocking attempt to acquire a permit so we can
203                    // record how often we're ever blocked on the concurrency limit
204                    match inner.try_acquire_owned() {
205                        Ok(permit) => permit,
206                        Err(TryAcquireError::NoPermits) => {
207                            metrics.compaction.concurrency_waits.inc();
208                            Arc::clone(&concurrency_limit)
209                                .acquire_owned()
210                                .await
211                                .expect("semaphore is never closed")
212                        }
213                        Err(TryAcquireError::Closed) => {
214                            // should never happen in practice. the semaphore is
215                            // never explicitly closed, nor will it close on Drop
216                            warn!("semaphore for shard {} is closed", machine.shard_id());
217                            continue;
218                        }
219                    }
220                };
221                metrics
222                    .compaction
223                    .queued_seconds
224                    .inc_by(enqueued.elapsed().as_secs_f64());
225
226                let write_schemas = write_schemas.clone();
227
228                let compact_span =
229                    debug_span!(parent: None, "compact::apply", shard_id=%machine.shard_id());
230                compact_span.follows_from(&Span::current());
231                let gc = gc.clone();
232                mz_ore::task::spawn(|| "PersistCompactionWorker", async move {
233                    let res = Self::compact_and_apply(&machine, req, write_schemas)
234                        .instrument(compact_span)
235                        .await;
236                    let res = res.map(|(res, maintenance)| {
237                        maintenance.start_performing(&machine, &gc);
238                        res
239                    });
240
241                    // we can safely ignore errors here, it's possible the caller
242                    // wasn't interested in waiting and dropped their receiver
243                    let _ = completer.send(res);
244
245                    // moves `permit` into async scope so it can be dropped upon completion
246                    drop(permit);
247                });
248            }
249        });
250
251        Compactor {
252            cfg,
253            metrics,
254            sender: compact_req_sender,
255            _phantom: PhantomData,
256        }
257    }
258
259    /// Enqueues a [CompactReq] to be consumed by the compaction background task when available.
260    ///
261    /// Returns a receiver that indicates when compaction has completed. The receiver can be
262    /// safely dropped at any time if the caller does not wish to wait on completion.
263    pub fn compact_and_apply_background(
264        &self,
265        req: CompactReq<T>,
266        machine: &Machine<K, V, T, D>,
267    ) -> Option<oneshot::Receiver<Result<ApplyMergeResult, anyhow::Error>>> {
268        // Run some initial heuristics to ignore some requests for compaction.
269        // We don't gain much from e.g. compacting two very small batches that
270        // were just written, but it does result in non-trivial blob traffic
271        // (especially in aggregate). This heuristic is something we'll need to
272        // tune over time.
273        let should_compact = req.inputs.len() >= COMPACTION_HEURISTIC_MIN_INPUTS.get(&self.cfg)
274            || req.inputs.iter().map(|x| x.part_count()).sum::<usize>()
275                >= COMPACTION_HEURISTIC_MIN_PARTS.get(&self.cfg)
276            || req.inputs.iter().map(|x| x.len).sum::<usize>()
277                >= COMPACTION_HEURISTIC_MIN_UPDATES.get(&self.cfg);
278        if !should_compact {
279            self.metrics.compaction.skipped.inc();
280            return None;
281        }
282
283        let (compaction_completed_sender, compaction_completed_receiver) = oneshot::channel();
284        let new_compaction_sender = self.sender.clone();
285
286        self.metrics.compaction.requested.inc();
287        // NB: we intentionally pass along the input machine, as it ought to come from the
288        // writer that generated the compaction request / maintenance. this machine has a
289        // spine structure that generated the request, so it has a much better chance of
290        // merging and committing the result than a machine kept up-to-date through state
291        // diffs, which may have a different spine structure less amenable to merging.
292        let send = new_compaction_sender.try_send((
293            Instant::now(),
294            req,
295            machine.clone(),
296            compaction_completed_sender,
297        ));
298        if let Err(_) = send {
299            self.metrics.compaction.dropped.inc();
300            return None;
301        }
302
303        Some(compaction_completed_receiver)
304    }
305
306    pub(crate) async fn compact_and_apply(
307        machine: &Machine<K, V, T, D>,
308        req: CompactReq<T>,
309        write_schemas: Schemas<K, V>,
310    ) -> Result<(ApplyMergeResult, RoutineMaintenance), anyhow::Error> {
311        let metrics = Arc::clone(&machine.applier.metrics);
312        metrics.compaction.started.inc();
313        let start = Instant::now();
314
315        // pick a timeout for our compaction request proportional to the amount
316        // of data that must be read (with a minimum set by PersistConfig)
317        let total_input_bytes = req
318            .inputs
319            .iter()
320            .map(|batch| batch.encoded_size_bytes())
321            .sum::<usize>();
322        let timeout = Duration::max(
323            // either our minimum timeout
324            COMPACTION_MINIMUM_TIMEOUT.get(&machine.applier.cfg),
325            // or 1s per MB of input data
326            Duration::from_secs(u64::cast_from(total_input_bytes / MiB)),
327        );
328        // always use most recent schema from all the Runs we're compacting to prevent Compactors
329        // created before the schema was evolved, from trying to "de-evolve" a Part.
330        let compaction_schema_id = req
331            .inputs
332            .iter()
333            .flat_map(|batch| batch.run_meta.iter())
334            .filter_map(|run_meta| run_meta.schema)
335            // It's an invariant that SchemaIds are ordered.
336            .max();
337        let maybe_compaction_schema = match compaction_schema_id {
338            Some(id) => machine
339                .get_schema(id)
340                .map(|(key_schema, val_schema)| (id, key_schema, val_schema)),
341            None => None,
342        };
343        let use_most_recent_schema = COMPACTION_USE_MOST_RECENT_SCHEMA.get(&machine.applier.cfg);
344
345        let compaction_schema = match maybe_compaction_schema {
346            Some((id, key_schema, val_schema)) if use_most_recent_schema => {
347                metrics.compaction.schema_selection.recent_schema.inc();
348                Schemas {
349                    id: Some(id),
350                    key: Arc::new(key_schema),
351                    val: Arc::new(val_schema),
352                }
353            }
354            Some(_) => {
355                metrics.compaction.schema_selection.disabled.inc();
356                write_schemas
357            }
358            None => {
359                metrics.compaction.schema_selection.no_schema.inc();
360                write_schemas
361            }
362        };
363
364        trace!(
365            "compaction request for {}MBs ({} bytes), with timeout of {}s, and schema {:?}.",
366            total_input_bytes / MiB,
367            total_input_bytes,
368            timeout.as_secs_f64(),
369            compaction_schema.id,
370        );
371
372        let compact_span = debug_span!("compact::consolidate");
373        let res = tokio::time::timeout(
374            timeout,
375            // Compaction is cpu intensive, so be polite and spawn it on the isolated runtime.
376            machine
377                .isolated_runtime
378                .spawn_named(
379                    || "persist::compact::consolidate",
380                    Self::compact(
381                        CompactConfig::new(&machine.applier.cfg, machine.shard_id()),
382                        Arc::clone(&machine.applier.state_versions.blob),
383                        Arc::clone(&metrics),
384                        Arc::clone(&machine.applier.shard_metrics),
385                        Arc::clone(&machine.isolated_runtime),
386                        req,
387                        compaction_schema,
388                    )
389                    .instrument(compact_span),
390                )
391                .map_err(|e| anyhow!(e)),
392        )
393        .await;
394
395        let res = match res {
396            Ok(res) => res,
397            Err(err) => {
398                metrics.compaction.timed_out.inc();
399                Err(anyhow!(err))
400            }
401        };
402
403        metrics
404            .compaction
405            .seconds
406            .inc_by(start.elapsed().as_secs_f64());
407
408        match res {
409            Ok(Ok(res)) => {
410                let res = FueledMergeRes { output: res.output };
411                let (apply_merge_result, maintenance) = machine.merge_res(&res).await;
412                match &apply_merge_result {
413                    ApplyMergeResult::AppliedExact => {
414                        metrics.compaction.applied.inc();
415                        metrics.compaction.applied_exact_match.inc();
416                        machine.applier.shard_metrics.compaction_applied.inc();
417                        Ok((apply_merge_result, maintenance))
418                    }
419                    ApplyMergeResult::AppliedSubset => {
420                        metrics.compaction.applied.inc();
421                        metrics.compaction.applied_subset_match.inc();
422                        machine.applier.shard_metrics.compaction_applied.inc();
423                        Ok((apply_merge_result, maintenance))
424                    }
425                    ApplyMergeResult::NotAppliedNoMatch
426                    | ApplyMergeResult::NotAppliedInvalidSince
427                    | ApplyMergeResult::NotAppliedTooManyUpdates => {
428                        if let ApplyMergeResult::NotAppliedTooManyUpdates = &apply_merge_result {
429                            metrics.compaction.not_applied_too_many_updates.inc();
430                        }
431                        metrics.compaction.noop.inc();
432                        let mut part_deletes = PartDeletes::default();
433                        for part in res.output.parts {
434                            part_deletes.add(&part);
435                        }
436                        let () = part_deletes
437                            .delete(
438                                machine.applier.state_versions.blob.as_ref(),
439                                machine.shard_id(),
440                                GC_BLOB_DELETE_CONCURRENCY_LIMIT.get(&machine.applier.cfg),
441                                &*metrics,
442                                &metrics.retries.external.compaction_noop_delete,
443                            )
444                            .await;
445                        Ok((apply_merge_result, maintenance))
446                    }
447                }
448            }
449            Ok(Err(err)) | Err(err) => {
450                metrics.compaction.failed.inc();
451                debug!(
452                    "compaction for {} failed: {}",
453                    machine.shard_id(),
454                    err.display_with_causes()
455                );
456                Err(err)
457            }
458        }
459    }
460
461    /// Compacts input batches in bounded memory.
462    ///
463    /// The memory bound is broken into pieces:
464    ///     1. in-progress work
465    ///     2. fetching parts from runs
466    ///     3. additional in-flight requests to Blob
467    ///
468    /// 1. In-progress work is bounded by 2 * [BatchBuilderConfig::blob_target_size]. This
469    ///    usage is met at two mutually exclusive moments:
470    ///   * When reading in a part, we hold the columnar format in memory while writing its
471    ///     contents into a heap.
472    ///   * When writing a part, we hold a temporary updates buffer while encoding/writing
473    ///     it into a columnar format for Blob.
474    ///
475    /// 2. When compacting runs, only 1 part from each one is held in memory at a time.
476    ///    Compaction will determine an appropriate number of runs to compact together
477    ///    given the memory bound and accounting for the reservation in (1). A minimum
478    ///    of 2 * [BatchBuilderConfig::blob_target_size] of memory is expected, to be
479    ///    able to at least have the capacity to compact two runs together at a time,
480    ///    and more runs will be compacted together if more memory is available.
481    ///
482    /// 3. If there is excess memory after accounting for (1) and (2), we increase the
483    ///    number of outstanding parts we can keep in-flight to Blob.
484    pub async fn compact(
485        cfg: CompactConfig,
486        blob: Arc<dyn Blob>,
487        metrics: Arc<Metrics>,
488        shard_metrics: Arc<ShardMetrics>,
489        isolated_runtime: Arc<IsolatedRuntime>,
490        req: CompactReq<T>,
491        write_schemas: Schemas<K, V>,
492    ) -> Result<CompactRes<T>, anyhow::Error> {
493        let () = Self::validate_req(&req)?;
494
495        // We introduced a fast-path optimization in https://github.com/MaterializeInc/materialize/pull/15363
496        // but had to revert it due to a very scary bug. Here we count how many of our compaction reqs
497        // could be eligible for the optimization to better understand whether it's worth trying to
498        // reintroduce it.
499        let mut single_nonempty_batch = None;
500        for batch in &req.inputs {
501            if batch.len > 0 {
502                match single_nonempty_batch {
503                    None => single_nonempty_batch = Some(batch),
504                    Some(_previous_nonempty_batch) => {
505                        single_nonempty_batch = None;
506                        break;
507                    }
508                }
509            }
510        }
511        if let Some(single_nonempty_batch) = single_nonempty_batch {
512            if single_nonempty_batch.run_splits.len() == 0
513                && single_nonempty_batch.desc.since() != &Antichain::from_elem(T::minimum())
514            {
515                metrics.compaction.fast_path_eligible.inc();
516            }
517        }
518
519        // compaction needs memory enough for at least 2 runs and 2 in-progress parts
520        assert!(cfg.compaction_memory_bound_bytes >= 4 * cfg.batch.blob_target_size);
521        // reserve space for the in-progress part to be held in-mem representation and columnar
522        let in_progress_part_reserved_memory_bytes = 2 * cfg.batch.blob_target_size;
523        // then remaining memory will go towards pulling down as many runs as we can
524        let run_reserved_memory_bytes =
525            cfg.compaction_memory_bound_bytes - in_progress_part_reserved_memory_bytes;
526
527        let mut all_parts = vec![];
528        let mut all_run_splits = vec![];
529        let mut all_run_meta = vec![];
530        let mut len = 0;
531
532        let ordered_runs =
533            Self::order_runs(&req, cfg.batch.preferred_order, &*blob, &*metrics).await?;
534        for (runs, run_chunk_max_memory_usage) in
535            Self::chunk_runs(&ordered_runs, &cfg, &*metrics, run_reserved_memory_bytes)
536        {
537            metrics.compaction.chunks_compacted.inc();
538            metrics
539                .compaction
540                .runs_compacted
541                .inc_by(u64::cast_from(runs.len()));
542
543            // given the runs we actually have in our batch, we might have extra memory
544            // available. we reserved enough space to always have 1 in-progress part in
545            // flight, but if we have excess, we can use it to increase our write parallelism
546            let extra_outstanding_parts = (run_reserved_memory_bytes
547                .saturating_sub(run_chunk_max_memory_usage))
548                / cfg.batch.blob_target_size;
549            let mut run_cfg = cfg.clone();
550            run_cfg.batch.batch_builder_max_outstanding_parts = 1 + extra_outstanding_parts;
551            let batch = Self::compact_runs(
552                &run_cfg,
553                &req.shard_id,
554                &req.desc,
555                runs,
556                Arc::clone(&blob),
557                Arc::clone(&metrics),
558                Arc::clone(&shard_metrics),
559                Arc::clone(&isolated_runtime),
560                write_schemas.clone(),
561            )
562            .await?;
563            let (parts, run_splits, run_meta, updates) =
564                (batch.parts, batch.run_splits, batch.run_meta, batch.len);
565            assert!(
566                (updates == 0 && parts.len() == 0) || (updates > 0 && parts.len() > 0),
567                "updates={}, parts={}",
568                updates,
569                parts.len(),
570            );
571
572            if updates == 0 {
573                continue;
574            }
575            // merge together parts and runs from each compaction round.
576            // parts are appended onto our existing vec, and then we shift
577            // the latest run offsets to account for prior parts.
578            //
579            // e.g. if we currently have 3 parts and 2 runs (including the implicit one from 0):
580            //         parts: [k0, k1, k2]
581            //         runs:  [    1     ]
582            //
583            // and we merge in another result with 2 parts and 2 runs:
584            //         parts: [k3, k4]
585            //         runs:  [    1]
586            //
587            // we our result will contain 5 parts and 4 runs:
588            //         parts: [k0, k1, k2, k3, k4]
589            //         runs:  [    1       3   4 ]
590            let run_offset = all_parts.len();
591            if all_parts.len() > 0 {
592                all_run_splits.push(run_offset);
593            }
594            all_run_splits.extend(run_splits.iter().map(|run_start| run_start + run_offset));
595            all_run_meta.extend(run_meta);
596            all_parts.extend(parts);
597            len += updates;
598        }
599
600        Ok(CompactRes {
601            output: HollowBatch::new(
602                req.desc.clone(),
603                all_parts,
604                len,
605                all_run_meta,
606                all_run_splits,
607            ),
608        })
609    }
610
611    /// Sorts and groups all runs from the inputs into chunks, each of which has been determined
612    /// to consume no more than `run_reserved_memory_bytes` at a time, unless the input parts
613    /// were written with a different target size than this build. Uses [Self::order_runs] to
614    /// determine the order in which runs are selected.
615    fn chunk_runs<'a>(
616        ordered_runs: &'a [(&'a Description<T>, &'a RunMeta, Cow<'a, [RunPart<T>]>)],
617        cfg: &CompactConfig,
618        metrics: &Metrics,
619        run_reserved_memory_bytes: usize,
620    ) -> Vec<(
621        Vec<(&'a Description<T>, &'a RunMeta, &'a [RunPart<T>])>,
622        usize,
623    )> {
624        let mut ordered_runs = ordered_runs.into_iter().peekable();
625
626        let mut chunks = vec![];
627        let mut current_chunk = vec![];
628        let mut current_chunk_max_memory_usage = 0;
629        while let Some((desc, meta, run)) = ordered_runs.next() {
630            let run_greatest_part_size = run
631                .iter()
632                .map(|x| x.max_part_bytes())
633                .max()
634                .unwrap_or(cfg.batch.blob_target_size);
635            current_chunk.push((*desc, *meta, &**run));
636            current_chunk_max_memory_usage += run_greatest_part_size;
637
638            if let Some((_next_desc, _next_meta, next_run)) = ordered_runs.peek() {
639                let next_run_greatest_part_size = next_run
640                    .iter()
641                    .map(|x| x.max_part_bytes())
642                    .max()
643                    .unwrap_or(cfg.batch.blob_target_size);
644
645                // if we can fit the next run in our chunk without going over our reserved memory, we should do so
646                if current_chunk_max_memory_usage + next_run_greatest_part_size
647                    <= run_reserved_memory_bytes
648                {
649                    continue;
650                }
651
652                // NB: There's an edge case where we cannot fit at least 2 runs into a chunk
653                // with our reserved memory. This could happen if blobs were written with a
654                // larger target size than the current build. When this happens, we violate
655                // our memory requirement and force chunks to be at least length 2, so that we
656                // can be assured runs are merged and converge over time.
657                if current_chunk.len() == 1 {
658                    // in the steady state we expect this counter to be 0, and would only
659                    // anticipate it being temporarily nonzero if we changed target blob size
660                    // or our memory requirement calculations
661                    metrics.compaction.memory_violations.inc();
662                    continue;
663                }
664            }
665
666            chunks.push((
667                std::mem::take(&mut current_chunk),
668                current_chunk_max_memory_usage,
669            ));
670            current_chunk_max_memory_usage = 0;
671        }
672
673        chunks
674    }
675
676    /// With bounded memory where we cannot compact all runs/parts together, the groupings
677    /// in which we select runs to compact together will affect how much we're able to
678    /// consolidate updates.
679    ///
680    /// This approach orders the input runs by cycling through each batch, selecting the
681    /// head element until all are consumed. It assumes that it is generally more effective
682    /// to prioritize compacting runs from different batches, rather than runs from within
683    /// a single batch.
684    ///
685    /// ex.
686    /// ```text
687    ///        inputs                                        output
688    ///     b0 runs=[A, B]
689    ///     b1 runs=[C]                           output=[A, C, D, B, E, F]
690    ///     b2 runs=[D, E, F]
691    /// ```
692    async fn order_runs<'a>(
693        req: &'a CompactReq<T>,
694        target_order: RunOrder,
695        blob: &'a dyn Blob,
696        metrics: &'a Metrics,
697    ) -> anyhow::Result<Vec<(&'a Description<T>, &'a RunMeta, Cow<'a, [RunPart<T>]>)>> {
698        let total_number_of_runs = req
699            .inputs
700            .iter()
701            .map(|x| x.run_splits.len() + 1)
702            .sum::<usize>();
703
704        let mut batch_runs: VecDeque<_> = req
705            .inputs
706            .iter()
707            .map(|batch| (&batch.desc, batch.runs()))
708            .collect();
709
710        let mut ordered_runs = Vec::with_capacity(total_number_of_runs);
711
712        while let Some((desc, mut runs)) = batch_runs.pop_front() {
713            if let Some((meta, run)) = runs.next() {
714                let same_order = meta.order.unwrap_or(RunOrder::Codec) == target_order;
715                if same_order {
716                    ordered_runs.push((desc, meta, Cow::Borrowed(run)));
717                } else {
718                    // The downstream consolidation step will handle a long run that's not in
719                    // the desired order by splitting it up into many single-element runs. This preserves
720                    // correctness, but it means that we may end up needing to iterate through
721                    // many more parts concurrently than expected, increasing memory use. Instead,
722                    // we break up those runs into individual batch parts, fetching hollow runs as
723                    // necessary, before they're grouped together to be passed to consolidation.
724                    // The downside is that this breaks the usual property that compaction produces
725                    // fewer runs than it takes in. This should generally be resolved by future
726                    // runs of compaction.
727                    for part in run {
728                        let mut batch_parts = pin!(part.part_stream(req.shard_id, blob, metrics));
729                        while let Some(part) = batch_parts.next().await {
730                            ordered_runs.push((
731                                desc,
732                                meta,
733                                Cow::Owned(vec![RunPart::Single(part?.into_owned())]),
734                            ));
735                        }
736                    }
737                }
738                batch_runs.push_back((desc, runs));
739            }
740        }
741
742        Ok(ordered_runs)
743    }
744
745    /// Compacts runs together. If the input runs are sorted, a single run will be created as output.
746    ///
747    /// Maximum possible memory usage is `(# runs + 2) * [crate::PersistConfig::blob_target_size]`
748    pub(crate) async fn compact_runs(
749        cfg: &CompactConfig,
750        shard_id: &ShardId,
751        desc: &Description<T>,
752        runs: Vec<(&Description<T>, &RunMeta, &[RunPart<T>])>,
753        blob: Arc<dyn Blob>,
754        metrics: Arc<Metrics>,
755        shard_metrics: Arc<ShardMetrics>,
756        isolated_runtime: Arc<IsolatedRuntime>,
757        write_schemas: Schemas<K, V>,
758    ) -> Result<HollowBatch<T>, anyhow::Error> {
759        // TODO: Figure out a more principled way to allocate our memory budget.
760        // Currently, we give any excess budget to write parallelism. If we had
761        // to pick between 100% towards writes vs 100% towards reads, then reads
762        // is almost certainly better, but the ideal is probably somewhere in
763        // between the two.
764        //
765        // For now, invent some some extra budget out of thin air for prefetch.
766        let prefetch_budget_bytes = 2 * cfg.batch.blob_target_size;
767
768        let mut timings = Timings::default();
769
770        let mut batch_cfg = cfg.batch.clone();
771
772        // Use compaction as a method of getting inline writes out of state, to
773        // make room for more inline writes. We could instead do this at the end
774        // of compaction by flushing out the batch, but doing it here based on
775        // the config allows BatchBuilder to do its normal pipelining of writes.
776        batch_cfg.inline_writes_single_max_bytes = 0;
777
778        let parts = BatchParts::new_ordered(
779            batch_cfg,
780            cfg.batch.preferred_order,
781            Arc::clone(&metrics),
782            Arc::clone(&shard_metrics),
783            *shard_id,
784            Arc::clone(&blob),
785            Arc::clone(&isolated_runtime),
786            &metrics.compaction.batch,
787        );
788        let mut batch = BatchBuilderInternal::<K, V, T, D>::new(
789            cfg.batch.clone(),
790            parts,
791            Arc::clone(&metrics),
792            write_schemas.clone(),
793            Arc::clone(&blob),
794            shard_id.clone(),
795            cfg.version.clone(),
796        );
797
798        let mut consolidator = Consolidator::new(
799            format!(
800                "{}[lower={:?},upper={:?}]",
801                shard_id,
802                desc.lower().elements(),
803                desc.upper().elements()
804            ),
805            *shard_id,
806            StructuredSort::<K, V, T, D>::new(write_schemas.clone()),
807            blob,
808            Arc::clone(&metrics),
809            shard_metrics,
810            metrics.read.compaction.clone(),
811            FetchBatchFilter::Compaction {
812                since: desc.since().clone(),
813            },
814            prefetch_budget_bytes,
815        );
816
817        for (desc, meta, parts) in runs {
818            consolidator.enqueue_run(desc, meta, parts.iter().cloned());
819        }
820
821        let remaining_budget = consolidator.start_prefetches();
822        if remaining_budget.is_none() {
823            metrics.compaction.not_all_prefetched.inc();
824        }
825
826        loop {
827            let mut chunks = vec![];
828            let mut total_bytes = 0;
829            // We attempt to pull chunks out of the consolidator that match our target size,
830            // but it's possible that we may get smaller chunks... for example, if not all
831            // parts have been fetched yet. Loop until we've got enough data to justify flushing
832            // it out to blob (or we run out of data.)
833            while total_bytes < cfg.batch.blob_target_size {
834                let fetch_start = Instant::now();
835                let Some(chunk) = consolidator
836                    .next_chunk(
837                        cfg.compaction_yield_after_n_updates,
838                        cfg.batch.blob_target_size - total_bytes,
839                    )
840                    .await?
841                else {
842                    break;
843                };
844                timings.part_fetching += fetch_start.elapsed();
845                total_bytes += chunk.goodbytes();
846                chunks.push(chunk);
847                tokio::task::yield_now().await;
848            }
849
850            // In the hopefully-common case of a single chunk, this will not copy.
851            let Some(updates) = Part::concat(&chunks).expect("compaction produces well-typed data")
852            else {
853                break;
854            };
855            batch.flush_part(desc.clone(), updates).await;
856        }
857        let mut batch = batch.finish(desc.clone()).await?;
858
859        // We use compaction as a method of getting inline writes out of state,
860        // to make room for more inline writes. This happens in
861        // `CompactConfig::new` by overriding the inline writes threshold
862        // config. This is a bit action-at-a-distance, so defensively detect if
863        // this breaks here and log and correct it if so.
864        let has_inline_parts = batch.batch.parts.iter().any(|x| x.is_inline());
865        if has_inline_parts {
866            error!(%shard_id, ?cfg, "compaction result unexpectedly had inline writes");
867            let () = batch
868                .flush_to_blob(
869                    &cfg.batch,
870                    &metrics.compaction.batch,
871                    &isolated_runtime,
872                    &write_schemas,
873                )
874                .await;
875        }
876
877        timings.record(&metrics);
878        Ok(batch.into_hollow_batch())
879    }
880
881    fn validate_req(req: &CompactReq<T>) -> Result<(), anyhow::Error> {
882        let mut frontier = req.desc.lower();
883        for input in req.inputs.iter() {
884            if PartialOrder::less_than(req.desc.since(), input.desc.since()) {
885                return Err(anyhow!(
886                    "output since {:?} must be at or in advance of input since {:?}",
887                    req.desc.since(),
888                    input.desc.since()
889                ));
890            }
891            if frontier != input.desc.lower() {
892                return Err(anyhow!(
893                    "invalid merge of non-consecutive batches {:?} vs {:?}",
894                    frontier,
895                    input.desc.lower()
896                ));
897            }
898            frontier = input.desc.upper();
899        }
900        if frontier != req.desc.upper() {
901            return Err(anyhow!(
902                "invalid merge of non-consecutive batches {:?} vs {:?}",
903                frontier,
904                req.desc.upper()
905            ));
906        }
907        Ok(())
908    }
909}
910
911#[derive(Debug, Default)]
912struct Timings {
913    part_fetching: Duration,
914    heap_population: Duration,
915}
916
917impl Timings {
918    fn record(self, metrics: &Metrics) {
919        // intentionally deconstruct so we don't forget to consider each field
920        let Timings {
921            part_fetching,
922            heap_population,
923        } = self;
924
925        metrics
926            .compaction
927            .steps
928            .part_fetch_seconds
929            .inc_by(part_fetching.as_secs_f64());
930        metrics
931            .compaction
932            .steps
933            .heap_population_seconds
934            .inc_by(heap_population.as_secs_f64());
935    }
936}
937
938#[cfg(test)]
939mod tests {
940    use mz_dyncfg::ConfigUpdates;
941    use mz_ore::{assert_contains, assert_err};
942    use mz_persist_types::codec_impls::StringSchema;
943    use timely::order::Product;
944    use timely::progress::Antichain;
945
946    use crate::PersistLocation;
947    use crate::batch::BLOB_TARGET_SIZE;
948    use crate::tests::{all_ok, expect_fetch_part, new_test_client_cache};
949
950    use super::*;
951
952    // A regression test for a bug caught during development of materialize#13160 (never
953    // made it to main) where batches written by compaction would always have a
954    // since of the minimum timestamp.
955    #[mz_persist_proc::test(tokio::test)]
956    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
957    async fn regression_minimum_since(dyncfgs: ConfigUpdates) {
958        let data = vec![
959            (("0".to_owned(), "zero".to_owned()), 0, 1),
960            (("0".to_owned(), "zero".to_owned()), 1, -1),
961            (("1".to_owned(), "one".to_owned()), 1, 1),
962        ];
963
964        let cache = new_test_client_cache(&dyncfgs);
965        cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
966        let (mut write, _) = cache
967            .open(PersistLocation::new_in_mem())
968            .await
969            .expect("client construction failed")
970            .expect_open::<String, String, u64, i64>(ShardId::new())
971            .await;
972        let b0 = write
973            .expect_batch(&data[..1], 0, 1)
974            .await
975            .into_hollow_batch();
976        let b1 = write
977            .expect_batch(&data[1..], 1, 2)
978            .await
979            .into_hollow_batch();
980
981        let req = CompactReq {
982            shard_id: write.machine.shard_id(),
983            desc: Description::new(
984                b0.desc.lower().clone(),
985                b1.desc.upper().clone(),
986                Antichain::from_elem(10u64),
987            ),
988            inputs: vec![b0, b1],
989        };
990        let schemas = Schemas {
991            id: None,
992            key: Arc::new(StringSchema),
993            val: Arc::new(StringSchema),
994        };
995        let res = Compactor::<String, String, u64, i64>::compact(
996            CompactConfig::new(&write.cfg, write.shard_id()),
997            Arc::clone(&write.blob),
998            Arc::clone(&write.metrics),
999            write.metrics.shards.shard(&write.machine.shard_id(), ""),
1000            Arc::new(IsolatedRuntime::default()),
1001            req.clone(),
1002            schemas.clone(),
1003        )
1004        .await
1005        .expect("compaction failed");
1006
1007        assert_eq!(res.output.desc, req.desc);
1008        assert_eq!(res.output.len, 1);
1009        assert_eq!(res.output.part_count(), 1);
1010        let part = res.output.parts[0].expect_hollow_part();
1011        let (part, updates) = expect_fetch_part(
1012            write.blob.as_ref(),
1013            &part.key.complete(&write.machine.shard_id()),
1014            &write.metrics,
1015            &schemas,
1016        )
1017        .await;
1018        assert_eq!(part.desc, res.output.desc);
1019        assert_eq!(updates, all_ok(&data, 10));
1020    }
1021
1022    #[mz_persist_proc::test(tokio::test)]
1023    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1024    async fn compaction_partial_order(dyncfgs: ConfigUpdates) {
1025        let data = vec![
1026            (("0".to_owned(), "zero".to_owned()), Product::new(0, 10), 1),
1027            (("1".to_owned(), "one".to_owned()), Product::new(10, 0), 1),
1028        ];
1029
1030        let cache = new_test_client_cache(&dyncfgs);
1031        cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1032        let (mut write, _) = cache
1033            .open(PersistLocation::new_in_mem())
1034            .await
1035            .expect("client construction failed")
1036            .expect_open::<String, String, Product<u32, u32>, i64>(ShardId::new())
1037            .await;
1038        let b0 = write
1039            .batch(
1040                &data[..1],
1041                Antichain::from_elem(Product::new(0, 0)),
1042                Antichain::from_iter([Product::new(0, 11), Product::new(10, 0)]),
1043            )
1044            .await
1045            .expect("invalid usage")
1046            .into_hollow_batch();
1047
1048        let b1 = write
1049            .batch(
1050                &data[1..],
1051                Antichain::from_iter([Product::new(0, 11), Product::new(10, 0)]),
1052                Antichain::from_elem(Product::new(10, 1)),
1053            )
1054            .await
1055            .expect("invalid usage")
1056            .into_hollow_batch();
1057
1058        let req = CompactReq {
1059            shard_id: write.machine.shard_id(),
1060            desc: Description::new(
1061                b0.desc.lower().clone(),
1062                b1.desc.upper().clone(),
1063                Antichain::from_elem(Product::new(10, 0)),
1064            ),
1065            inputs: vec![b0, b1],
1066        };
1067        let schemas = Schemas {
1068            id: None,
1069            key: Arc::new(StringSchema),
1070            val: Arc::new(StringSchema),
1071        };
1072        let res = Compactor::<String, String, Product<u32, u32>, i64>::compact(
1073            CompactConfig::new(&write.cfg, write.shard_id()),
1074            Arc::clone(&write.blob),
1075            Arc::clone(&write.metrics),
1076            write.metrics.shards.shard(&write.machine.shard_id(), ""),
1077            Arc::new(IsolatedRuntime::default()),
1078            req.clone(),
1079            schemas.clone(),
1080        )
1081        .await
1082        .expect("compaction failed");
1083
1084        assert_eq!(res.output.desc, req.desc);
1085        assert_eq!(res.output.len, 2);
1086        assert_eq!(res.output.part_count(), 1);
1087        let part = res.output.parts[0].expect_hollow_part();
1088        let (part, updates) = expect_fetch_part(
1089            write.blob.as_ref(),
1090            &part.key.complete(&write.machine.shard_id()),
1091            &write.metrics,
1092            &schemas,
1093        )
1094        .await;
1095        assert_eq!(part.desc, res.output.desc);
1096        assert_eq!(updates, all_ok(&data, Product::new(10, 0)));
1097    }
1098
1099    #[mz_persist_proc::test(tokio::test)]
1100    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1101    async fn disable_compaction(dyncfgs: ConfigUpdates) {
1102        let data = [
1103            (("0".to_owned(), "zero".to_owned()), 0, 1),
1104            (("0".to_owned(), "zero".to_owned()), 1, -1),
1105            (("1".to_owned(), "one".to_owned()), 1, 1),
1106        ];
1107
1108        let cache = new_test_client_cache(&dyncfgs);
1109        cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1110        let (mut write, _) = cache
1111            .open(PersistLocation::new_in_mem())
1112            .await
1113            .expect("client construction failed")
1114            .expect_open::<String, String, u64, i64>(ShardId::new())
1115            .await;
1116        let b0 = write
1117            .expect_batch(&data[..1], 0, 1)
1118            .await
1119            .into_hollow_batch();
1120        let b1 = write
1121            .expect_batch(&data[1..], 1, 2)
1122            .await
1123            .into_hollow_batch();
1124
1125        let req = CompactReq {
1126            shard_id: write.machine.shard_id(),
1127            desc: Description::new(
1128                b0.desc.lower().clone(),
1129                b1.desc.upper().clone(),
1130                Antichain::from_elem(10u64),
1131            ),
1132            inputs: vec![b0, b1],
1133        };
1134        write.cfg.set_config(&COMPACTION_HEURISTIC_MIN_INPUTS, 1);
1135        let compactor = write.compact.as_ref().expect("compaction hard disabled");
1136
1137        write.cfg.disable_compaction();
1138        let result = compactor
1139            .compact_and_apply_background(req.clone(), &write.machine)
1140            .expect("listener")
1141            .await
1142            .expect("channel closed");
1143        assert_err!(result);
1144        assert_contains!(result.unwrap_err().to_string(), "compaction disabled");
1145
1146        write.cfg.enable_compaction();
1147        compactor
1148            .compact_and_apply_background(req, &write.machine)
1149            .expect("listener")
1150            .await
1151            .expect("channel closed")
1152            .expect("compaction success");
1153
1154        // Make sure our CYA dyncfg works.
1155        let data2 = [
1156            (("2".to_owned(), "two".to_owned()), 2, 1),
1157            (("2".to_owned(), "two".to_owned()), 3, -1),
1158            (("3".to_owned(), "three".to_owned()), 3, 1),
1159        ];
1160
1161        let b2 = write
1162            .expect_batch(&data2[..1], 2, 3)
1163            .await
1164            .into_hollow_batch();
1165        let b3 = write
1166            .expect_batch(&data2[1..], 3, 4)
1167            .await
1168            .into_hollow_batch();
1169
1170        let req = CompactReq {
1171            shard_id: write.machine.shard_id(),
1172            desc: Description::new(
1173                b2.desc.lower().clone(),
1174                b3.desc.upper().clone(),
1175                Antichain::from_elem(20u64),
1176            ),
1177            inputs: vec![b2, b3],
1178        };
1179        let compactor = write.compact.as_ref().expect("compaction hard disabled");
1180
1181        // When the dyncfg is set to false we should ignore the process flag.
1182        write.cfg.set_config(&COMPACTION_CHECK_PROCESS_FLAG, false);
1183        write.cfg.disable_compaction();
1184        // Compaction still succeeded!
1185        compactor
1186            .compact_and_apply_background(req, &write.machine)
1187            .expect("listener")
1188            .await
1189            .expect("channel closed")
1190            .expect("compaction success");
1191    }
1192}