1use std::collections::BTreeSet;
11use std::fmt::Debug;
12use std::marker::PhantomData;
13use std::mem;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use differential_dataflow::difference::Monoid;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::Description;
21use futures::{Stream, pin_mut};
22use futures_util::StreamExt;
23use itertools::Either;
24use mz_dyncfg::Config;
25use mz_ore::cast::CastFrom;
26use mz_ore::error::ErrorExt;
27use mz_ore::now::NowFn;
28use mz_ore::soft_assert_or_log;
29use mz_persist::location::Blob;
30use mz_persist_types::part::Part;
31use mz_persist_types::{Codec, Codec64};
32use timely::PartialOrder;
33use timely::progress::{Antichain, Timestamp};
34use tokio::sync::mpsc::Sender;
35use tokio::sync::{TryAcquireError, mpsc, oneshot};
36use tracing::{Instrument, Span, debug, debug_span, error, trace, warn};
37
38use crate::async_runtime::IsolatedRuntime;
39use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, BatchParts, PartDeletes};
40use crate::cfg::{
41 COMPACTION_HEURISTIC_MIN_INPUTS, COMPACTION_HEURISTIC_MIN_PARTS,
42 COMPACTION_HEURISTIC_MIN_UPDATES, COMPACTION_MEMORY_BOUND_BYTES,
43 GC_BLOB_DELETE_CONCURRENCY_LIMIT, MiB,
44};
45use crate::fetch::{FetchBatchFilter, FetchConfig};
46use crate::internal::encoding::Schemas;
47use crate::internal::gc::GarbageCollector;
48use crate::internal::machine::Machine;
49use crate::internal::maintenance::RoutineMaintenance;
50use crate::internal::metrics::ShardMetrics;
51use crate::internal::state::{HollowBatch, RunMeta, RunOrder, RunPart};
52use crate::internal::trace::{
53 ActiveCompaction, ApplyMergeResult, CompactionInput, FueledMergeRes, IdHollowBatch, SpineId,
54 id_range,
55};
56use crate::iter::{Consolidator, StructuredSort};
57use crate::{Metrics, PersistConfig, ShardId};
58
59#[derive(Debug, Clone)]
65pub struct CompactReq<T> {
66 pub shard_id: ShardId,
68 pub desc: Description<T>,
70 pub inputs: Vec<IdHollowBatch<T>>,
73}
74
75#[derive(Debug)]
77pub struct CompactRes<T> {
78 pub output: HollowBatch<T>,
80 pub input: CompactionInput,
82}
83
84#[derive(Debug, Clone)]
87pub struct CompactConfig {
88 pub(crate) compaction_memory_bound_bytes: usize,
89 pub(crate) compaction_yield_after_n_updates: usize,
90 pub(crate) version: semver::Version,
91 pub(crate) batch: BatchBuilderConfig,
92 pub(crate) fetch_config: FetchConfig,
93 pub(crate) now: NowFn,
94}
95
96impl CompactConfig {
97 pub fn new(value: &PersistConfig, shard_id: ShardId) -> Self {
99 CompactConfig {
100 compaction_memory_bound_bytes: COMPACTION_MEMORY_BOUND_BYTES.get(value),
101 compaction_yield_after_n_updates: value.compaction_yield_after_n_updates,
102 version: value.build_version.clone(),
103 batch: BatchBuilderConfig::new(value, shard_id),
104 fetch_config: FetchConfig::from_persist_config(value),
105 now: value.now.clone(),
106 }
107 }
108}
109
110#[derive(Debug)]
116pub struct Compactor<K, V, T, D> {
117 cfg: PersistConfig,
118 metrics: Arc<Metrics>,
119 sender: Sender<(
120 Instant,
121 CompactReq<T>,
122 Machine<K, V, T, D>,
123 oneshot::Sender<Result<(), anyhow::Error>>,
124 )>,
125 _phantom: PhantomData<fn() -> D>,
126}
127
128impl<K, V, T, D> Clone for Compactor<K, V, T, D> {
129 fn clone(&self) -> Self {
130 Compactor {
131 cfg: self.cfg.clone(),
132 metrics: Arc::clone(&self.metrics),
133 sender: self.sender.clone(),
134 _phantom: Default::default(),
135 }
136 }
137}
138
139pub(crate) const COMPACTION_MINIMUM_TIMEOUT: Config<Duration> = Config::new(
143 "persist_compaction_minimum_timeout",
144 Duration::from_secs(90),
145 "\
146 The minimum amount of time to allow a persist compaction request to run \
147 before timing it out (Materialize).",
148);
149
150pub(crate) const COMPACTION_CHECK_PROCESS_FLAG: Config<bool> = Config::new(
151 "persist_compaction_check_process_flag",
152 true,
153 "Whether Compactor will obey the process_requests flag in PersistConfig, \
154 which allows dynamically disabling compaction. If false, all compaction requests will be processed.",
155);
156
157fn input_id_range(ids: BTreeSet<SpineId>) -> CompactionInput {
159 let id = id_range(ids);
160
161 CompactionInput::IdRange(id)
162}
163
164impl<K, V, T, D> Compactor<K, V, T, D>
165where
166 K: Debug + Codec,
167 V: Debug + Codec,
168 T: Timestamp + Lattice + Codec64 + Sync,
169 D: Monoid + Ord + Codec64 + Send + Sync,
170{
171 pub fn new(
172 cfg: PersistConfig,
173 metrics: Arc<Metrics>,
174 gc: GarbageCollector<K, V, T, D>,
175 ) -> Self {
176 let (compact_req_sender, mut compact_req_receiver) = mpsc::channel::<(
177 Instant,
178 CompactReq<T>,
179 Machine<K, V, T, D>,
180 oneshot::Sender<Result<(), anyhow::Error>>,
181 )>(cfg.compaction_queue_size);
182 let concurrency_limit = Arc::new(tokio::sync::Semaphore::new(
183 cfg.compaction_concurrency_limit,
184 ));
185 let check_process_requests = COMPACTION_CHECK_PROCESS_FLAG.handle(&cfg.configs);
186 let process_requests = Arc::clone(&cfg.compaction_process_requests);
187
188 let _worker_handle = mz_ore::task::spawn(|| "PersistCompactionScheduler", async move {
191 while let Some((enqueued, req, machine, completer)) = compact_req_receiver.recv().await
192 {
193 assert_eq!(req.shard_id, machine.shard_id());
194 let metrics = Arc::clone(&machine.applier.metrics);
195
196 if check_process_requests.get()
198 && !process_requests.load(std::sync::atomic::Ordering::Relaxed)
199 {
200 let _ = completer.send(Err(anyhow::anyhow!("compaction disabled")));
203 metrics.compaction.disabled.inc();
204 tracing::warn!(shard_id = ?req.shard_id, "Dropping compaction request on the floor.");
205
206 continue;
207 }
208
209 let permit = {
210 let inner = Arc::clone(&concurrency_limit);
211 match inner.try_acquire_owned() {
214 Ok(permit) => permit,
215 Err(TryAcquireError::NoPermits) => {
216 metrics.compaction.concurrency_waits.inc();
217 Arc::clone(&concurrency_limit)
218 .acquire_owned()
219 .await
220 .expect("semaphore is never closed")
221 }
222 Err(TryAcquireError::Closed) => {
223 warn!("semaphore for shard {} is closed", machine.shard_id());
226 continue;
227 }
228 }
229 };
230 metrics
231 .compaction
232 .queued_seconds
233 .inc_by(enqueued.elapsed().as_secs_f64());
234
235 let compact_span =
236 debug_span!(parent: None, "compact::apply", shard_id=%machine.shard_id());
237 compact_span.follows_from(&Span::current());
238 let gc = gc.clone();
239 mz_ore::task::spawn(|| "PersistCompactionWorker", async move {
240 let res = Self::compact_and_apply(&machine, req)
241 .instrument(compact_span)
242 .await;
243
244 match res {
245 Ok(maintenance) => maintenance.start_performing(&machine, &gc),
246 Err(err) => {
247 debug!(shard_id =? machine.shard_id(), "compaction failed: {err:#}")
248 }
249 }
250
251 let _ = completer.send(Ok(()));
254
255 drop(permit);
257 });
258 }
259 });
260
261 Compactor {
262 cfg,
263 metrics,
264 sender: compact_req_sender,
265 _phantom: PhantomData,
266 }
267 }
268
269 pub fn compact_and_apply_background(
274 &self,
275 req: CompactReq<T>,
276 machine: &Machine<K, V, T, D>,
277 ) -> Option<oneshot::Receiver<Result<(), anyhow::Error>>> {
278 let should_compact = req.inputs.len() >= COMPACTION_HEURISTIC_MIN_INPUTS.get(&self.cfg)
284 || req
285 .inputs
286 .iter()
287 .map(|x| x.batch.part_count())
288 .sum::<usize>()
289 >= COMPACTION_HEURISTIC_MIN_PARTS.get(&self.cfg)
290 || req.inputs.iter().map(|x| x.batch.len).sum::<usize>()
291 >= COMPACTION_HEURISTIC_MIN_UPDATES.get(&self.cfg);
292 if !should_compact {
293 self.metrics.compaction.skipped.inc();
294 return None;
295 }
296
297 let (compaction_completed_sender, compaction_completed_receiver) = oneshot::channel();
298 let new_compaction_sender = self.sender.clone();
299
300 self.metrics.compaction.requested.inc();
301 let send = new_compaction_sender.try_send((
307 Instant::now(),
308 req,
309 machine.clone(),
310 compaction_completed_sender,
311 ));
312 if let Err(_) = send {
313 self.metrics.compaction.dropped.inc();
314 return None;
315 }
316
317 Some(compaction_completed_receiver)
318 }
319
320 pub(crate) async fn compact_and_apply(
321 machine: &Machine<K, V, T, D>,
322 req: CompactReq<T>,
323 ) -> Result<RoutineMaintenance, anyhow::Error> {
324 let metrics = Arc::clone(&machine.applier.metrics);
325 metrics.compaction.started.inc();
326 let start = Instant::now();
327
328 let total_input_bytes = req
331 .inputs
332 .iter()
333 .map(|batch| batch.batch.encoded_size_bytes())
334 .sum::<usize>();
335 let timeout = Duration::max(
336 COMPACTION_MINIMUM_TIMEOUT.get(&machine.applier.cfg),
338 Duration::from_secs(u64::cast_from(total_input_bytes / MiB)),
340 );
341 let Some(compaction_schema_id) = req
344 .inputs
345 .iter()
346 .flat_map(|batch| batch.batch.run_meta.iter())
347 .filter_map(|run_meta| run_meta.schema)
348 .max()
350 else {
351 metrics.compaction.schema_selection.no_schema.inc();
352 metrics.compaction.failed.inc();
353 return Err(anyhow!(
354 "compacting {shard_id} and spine ids {spine_ids}: could not determine schema id from inputs",
355 shard_id = req.shard_id,
356 spine_ids = mz_ore::str::separated(", ", req.inputs.iter().map(|i| i.id))
357 ));
358 };
359 let Some((key_schema, val_schema)) = machine.get_schema(compaction_schema_id) else {
360 metrics.compaction.schema_selection.no_schema.inc();
361 metrics.compaction.failed.inc();
362 return Err(anyhow!(
363 "compacting {shard_id} and spine ids {spine_ids}: schema id {compaction_schema_id} not present in machine state",
364 shard_id = req.shard_id,
365 spine_ids = mz_ore::str::separated(", ", req.inputs.iter().map(|i| i.id))
366 ));
367 };
368
369 metrics.compaction.schema_selection.recent_schema.inc();
370
371 let compaction_schema = Schemas {
372 id: Some(compaction_schema_id),
373 key: Arc::new(key_schema),
374 val: Arc::new(val_schema),
375 };
376
377 trace!(
378 "compaction request for {}MBs ({} bytes), with timeout of {}s, and schema {:?}.",
379 total_input_bytes / MiB,
380 total_input_bytes,
381 timeout.as_secs_f64(),
382 compaction_schema.id,
383 );
384
385 let isolated_runtime = Arc::clone(&machine.isolated_runtime);
386 let machine_clone = machine.clone();
387 let metrics_clone = Arc::clone(&machine.applier.metrics);
388 let compact_span = debug_span!("compact::consolidate");
389 let res = tokio::time::timeout(
390 timeout,
391 isolated_runtime.spawn_named(
393 || "persist::compact::consolidate",
394 async move {
395 let all_runs_have_uuids = req
399 .inputs
400 .iter()
401 .all(|x| x.batch.runs().all(|(meta, _)| meta.id.is_some()));
402 let all_runs_have_len = req
403 .inputs
404 .iter()
405 .all(|x| x.batch.runs().all(|(meta, _)| meta.len.is_some()));
406
407 let compact_cfg =
408 CompactConfig::new(&machine_clone.applier.cfg, machine_clone.shard_id());
409 let incremental_enabled = compact_cfg.batch.enable_incremental_compaction
410 && all_runs_have_uuids
411 && all_runs_have_len;
412 let stream = Self::compact_stream(
413 compact_cfg,
414 Arc::clone(&machine_clone.applier.state_versions.blob),
415 Arc::clone(&metrics_clone),
416 Arc::clone(&machine_clone.applier.shard_metrics),
417 Arc::clone(&machine_clone.isolated_runtime),
418 req.clone(),
419 compaction_schema,
420 incremental_enabled,
421 );
422
423 let maintenance = if incremental_enabled {
424 let mut maintenance = RoutineMaintenance::default();
425 pin_mut!(stream);
426 while let Some(res) = stream.next().await {
427 let res = res?;
428 let new_maintenance =
429 Self::apply(res, &metrics_clone, &machine_clone).await?;
430 maintenance.merge(new_maintenance);
431 }
432 maintenance
433 } else {
434 let res = Self::compact_all(stream, req.clone()).await?;
435 Self::apply(
436 FueledMergeRes {
437 output: res.output,
438 input: CompactionInput::Legacy,
439 new_active_compaction: None,
440 },
441 &metrics_clone,
442 &machine_clone,
443 )
444 .await?
445 };
446
447 Ok::<_, anyhow::Error>(maintenance)
448 }
449 .instrument(compact_span),
450 ),
451 )
452 .await;
453
454 metrics
455 .compaction
456 .seconds
457 .inc_by(start.elapsed().as_secs_f64());
458 let res = res.map_err(|e| {
459 metrics.compaction.timed_out.inc();
460 anyhow!(
461 "compaction timed out after {}s: {}",
462 timeout.as_secs_f64(),
463 e
464 )
465 })?;
466
467 match res {
468 Ok(maintenance) => Ok(maintenance),
469 Err(err) => {
470 metrics.compaction.failed.inc();
471 debug!(
472 "compaction for {} failed: {}",
473 machine.shard_id(),
474 err.display_with_causes()
475 );
476 Err(err)
477 }
478 }
479 }
480
481 pub async fn compact_all(
482 stream: impl Stream<Item = Result<FueledMergeRes<T>, anyhow::Error>>,
483 req: CompactReq<T>,
484 ) -> Result<CompactRes<T>, anyhow::Error> {
485 pin_mut!(stream);
486
487 let mut all_parts = vec![];
488 let mut all_run_splits = vec![];
489 let mut all_run_meta = vec![];
490 let mut len = 0;
491
492 while let Some(res) = stream.next().await {
493 let res = res?.output;
494 let (parts, updates, run_meta, run_splits) =
495 (res.parts, res.len, res.run_meta, res.run_splits);
496
497 if updates == 0 {
498 continue;
499 }
500
501 let run_offset = all_parts.len();
502 if !all_parts.is_empty() {
503 all_run_splits.push(run_offset);
504 }
505 all_run_splits.extend(run_splits.iter().map(|r| r + run_offset));
506 all_run_meta.extend(run_meta);
507 all_parts.extend(parts);
508 len += updates;
509 }
510
511 let batches = req.inputs.iter().map(|x| x.id).collect::<BTreeSet<_>>();
512 let input = input_id_range(batches);
513
514 Ok(CompactRes {
515 output: HollowBatch::new(
516 req.desc.clone(),
517 all_parts,
518 len,
519 all_run_meta,
520 all_run_splits,
521 ),
522 input,
523 })
524 }
525
526 pub async fn apply(
527 res: FueledMergeRes<T>,
528 metrics: &Metrics,
529 machine: &Machine<K, V, T, D>,
530 ) -> Result<RoutineMaintenance, anyhow::Error> {
531 let (apply_merge_result, maintenance) = machine.merge_res(&res).await;
532
533 match &apply_merge_result {
534 ApplyMergeResult::AppliedExact => {
535 metrics.compaction.applied.inc();
536 metrics.compaction.applied_exact_match.inc();
537 machine.applier.shard_metrics.compaction_applied.inc();
538 }
539 ApplyMergeResult::AppliedSubset => {
540 metrics.compaction.applied.inc();
541 metrics.compaction.applied_subset_match.inc();
542 machine.applier.shard_metrics.compaction_applied.inc();
543 }
544 ApplyMergeResult::NotAppliedNoMatch
545 | ApplyMergeResult::NotAppliedInvalidSince
546 | ApplyMergeResult::NotAppliedTooManyUpdates => {
547 if let ApplyMergeResult::NotAppliedTooManyUpdates = &apply_merge_result {
548 metrics.compaction.not_applied_too_many_updates.inc();
549 }
550 metrics.compaction.noop.inc();
551 let mut part_deletes = PartDeletes::default();
552 for part in &res.output.parts {
553 part_deletes.add(part);
554 }
555 part_deletes
556 .delete(
557 machine.applier.state_versions.blob.as_ref(),
558 machine.shard_id(),
559 GC_BLOB_DELETE_CONCURRENCY_LIMIT.get(&machine.applier.cfg),
560 &*metrics,
561 &metrics.retries.external.compaction_noop_delete,
562 )
563 .await;
564 }
565 };
566
567 Ok(maintenance)
568 }
569
570 pub fn compact_stream(
594 cfg: CompactConfig,
595 blob: Arc<dyn Blob>,
596 metrics: Arc<Metrics>,
597 shard_metrics: Arc<ShardMetrics>,
598 isolated_runtime: Arc<IsolatedRuntime>,
599 req: CompactReq<T>,
600 write_schemas: Schemas<K, V>,
601 incremental_enabled: bool,
602 ) -> impl Stream<Item = Result<FueledMergeRes<T>, anyhow::Error>> {
603 async_stream::stream! {
604 let () = Self::validate_req(&req)?;
605
606 let mut single_nonempty_batch = None;
611 for batch in &req.inputs {
612 if batch.batch.len > 0 {
613 match single_nonempty_batch {
614 None => single_nonempty_batch = Some(batch),
615 Some(_previous_nonempty_batch) => {
616 single_nonempty_batch = None;
617 break;
618 }
619 }
620 }
621 }
622 if let Some(single_nonempty_batch) = single_nonempty_batch {
623 if single_nonempty_batch.batch.run_splits.len() == 0
624 && single_nonempty_batch.batch.desc.since() != &Antichain::from_elem(T::minimum())
625 {
626 metrics.compaction.fast_path_eligible.inc();
627 }
628 }
629
630 let in_progress_part_reserved_memory_bytes = 2 * cfg.batch.blob_target_size;
632 let run_reserved_memory_bytes = cfg
637 .compaction_memory_bound_bytes
638 .saturating_sub(in_progress_part_reserved_memory_bytes);
639
640 let chunked_runs = Self::chunk_runs(
641 &req,
642 &cfg,
643 &*metrics,
644 run_reserved_memory_bytes,
645 req.desc.since()
646 );
647 let total_chunked_runs = chunked_runs.len();
648
649 let parts_before = req.inputs.iter().map(|x| x.batch.parts.len()).sum::<usize>();
650 let parts_after = chunked_runs.iter().flat_map(|(_, _, runs, _)| runs.iter().map(|(_, _, parts)| parts.len())).sum::<usize>();
651 assert_eq!(parts_before, parts_after, "chunking should not change the number of parts");
652
653 for (applied, (input, desc, runs, run_chunk_max_memory_usage)) in
654 chunked_runs.into_iter().enumerate()
655 {
656 metrics.compaction.chunks_compacted.inc();
657 metrics
658 .compaction
659 .runs_compacted
660 .inc_by(u64::cast_from(runs.len()));
661
662 let extra_outstanding_parts = (run_reserved_memory_bytes
666 .saturating_sub(run_chunk_max_memory_usage))
667 / cfg.batch.blob_target_size;
668 let mut run_cfg = cfg.clone();
669 run_cfg.batch.batch_builder_max_outstanding_parts = 1 + extra_outstanding_parts;
670
671 let desc = if incremental_enabled {
672 desc
673 } else {
674 req.desc.clone()
675 };
676
677 let runs = runs.iter()
678 .map(|(desc, meta, run)| (*desc, *meta, *run))
679 .collect::<Vec<_>>();
680
681 let batch = Self::compact_runs(
682 &run_cfg,
683 &req.shard_id,
684 &desc,
685 runs,
686 Arc::clone(&blob),
687 Arc::clone(&metrics),
688 Arc::clone(&shard_metrics),
689 Arc::clone(&isolated_runtime),
690 write_schemas.clone(),
691 )
692 .await?;
693
694 assert!(
695 (batch.len == 0 && batch.parts.len() == 0) || (batch.len > 0 && batch.parts.len() > 0),
696 "updates={}, parts={}",
697 batch.len,
698 batch.parts.len(),
699 );
700
701 let clock = cfg.now.clone();
703 let active_compaction = if applied < total_chunked_runs - 1 {
704 Some(ActiveCompaction { start_ms: clock() })
705 } else {
706 None
707 };
708
709 let res = CompactRes {
710 output: batch,
711 input,
712 };
713
714 let res = FueledMergeRes {
715 output: res.output,
716 new_active_compaction: active_compaction,
717 input: res.input,
718 };
719
720 yield Ok(res);
721 }
722 }
723 }
724
725 pub async fn compact(
730 cfg: CompactConfig,
731 blob: Arc<dyn Blob>,
732 metrics: Arc<Metrics>,
733 shard_metrics: Arc<ShardMetrics>,
734 isolated_runtime: Arc<IsolatedRuntime>,
735 req: CompactReq<T>,
736 write_schemas: Schemas<K, V>,
737 ) -> Result<CompactRes<T>, anyhow::Error> {
738 let stream = Self::compact_stream(
739 cfg,
740 Arc::clone(&blob),
741 Arc::clone(&metrics),
742 Arc::clone(&shard_metrics),
743 Arc::clone(&isolated_runtime),
744 req.clone(),
745 write_schemas,
746 false,
747 );
748
749 Self::compact_all(stream, req).await
750 }
751
752 fn chunk_runs<'a>(
757 req: &'a CompactReq<T>,
758 cfg: &CompactConfig,
759 metrics: &Metrics,
760 run_reserved_memory_bytes: usize,
761 since: &Antichain<T>,
762 ) -> Vec<(
763 CompactionInput,
764 Description<T>,
765 Vec<(&'a Description<T>, &'a RunMeta, &'a [RunPart<T>])>,
766 usize,
767 )> {
768 let _ = input_id_range(req.inputs.iter().map(|x| x.id).collect());
770
771 let mut batches: Vec<_> = req.inputs.iter().map(|x| (x.id, &*x.batch)).collect();
773 batches.sort_by_key(|(id, _)| *id);
774
775 let mut chunks = vec![];
776 let mut current_chunk_ids = BTreeSet::new();
777 let mut current_chunk_descs = Vec::new();
778 let mut current_chunk_runs = vec![];
779 let mut current_chunk_max_memory_usage = 0;
780
781 fn max_part_bytes<T>(parts: &[RunPart<T>], cfg: &CompactConfig) -> usize {
782 parts
783 .iter()
784 .map(|p| p.max_part_bytes())
785 .max()
786 .unwrap_or(cfg.batch.blob_target_size)
787 }
788
789 fn desc_range<T: Timestamp>(
790 descs: impl IntoIterator<Item = Description<T>>,
791 since: Antichain<T>,
792 ) -> Description<T> {
793 let mut descs = descs.into_iter();
794 let first = descs.next().expect("non-empty set of descriptions");
795 let lower = first.lower().clone();
796 let mut upper = first.upper().clone();
797 for desc in descs {
798 assert_eq!(&upper, desc.lower());
799 upper = desc.upper().clone();
800 }
801 let upper = upper.clone();
802 Description::new(lower, upper, since)
803 }
804
805 for (spine_id, batch) in batches {
806 let batch_size = batch
807 .runs()
808 .map(|(_, parts)| max_part_bytes(parts, cfg))
809 .sum::<usize>();
810
811 let num_runs = batch.run_meta.len();
812
813 let runs = batch.runs().flat_map(|(meta, parts)| {
814 if meta.order.unwrap_or(RunOrder::Codec) == cfg.batch.preferred_order {
815 Either::Left(std::iter::once((&batch.desc, meta, parts)))
816 } else {
817 soft_assert_or_log!(
827 !parts.iter().any(|r| matches!(r, RunPart::Many(_))),
828 "unexpected out-of-order hollow run"
829 );
830 Either::Right(
831 parts
832 .iter()
833 .map(move |p| (&batch.desc, meta, std::slice::from_ref(p))),
834 )
835 }
836 });
837
838 if current_chunk_max_memory_usage + batch_size <= run_reserved_memory_bytes
842 || current_chunk_runs.len() + num_runs <= 2
843 {
844 if current_chunk_max_memory_usage + batch_size > run_reserved_memory_bytes {
845 metrics.compaction.memory_violations.inc();
848 }
849 current_chunk_ids.insert(spine_id);
850 current_chunk_descs.push(batch.desc.clone());
851 current_chunk_runs.extend(runs);
852 current_chunk_max_memory_usage += batch_size;
853 continue;
854 }
855
856 if !current_chunk_ids.is_empty() {
858 chunks.push((
859 input_id_range(std::mem::take(&mut current_chunk_ids)),
860 desc_range(mem::take(&mut current_chunk_descs), since.clone()),
861 std::mem::take(&mut current_chunk_runs),
862 current_chunk_max_memory_usage,
863 ));
864 current_chunk_max_memory_usage = 0;
865 }
866
867 if batch_size <= run_reserved_memory_bytes {
869 current_chunk_ids.insert(spine_id);
870 current_chunk_descs.push(batch.desc.clone());
871 current_chunk_runs.extend(runs);
872 current_chunk_max_memory_usage += batch_size;
873 continue;
874 }
875
876 let mut run_iter = runs.into_iter().peekable();
879 debug_assert!(current_chunk_ids.is_empty());
880 debug_assert!(current_chunk_descs.is_empty());
881 debug_assert!(current_chunk_runs.is_empty());
882 debug_assert_eq!(current_chunk_max_memory_usage, 0);
883 let mut current_chunk_run_ids = BTreeSet::new();
884
885 while let Some((desc, meta, parts)) = run_iter.next() {
886 let run_size = max_part_bytes(parts, cfg);
887 current_chunk_runs.push((desc, meta, parts));
888 current_chunk_max_memory_usage += run_size;
889 current_chunk_run_ids.extend(meta.id);
890
891 if let Some((_, _meta, next_parts)) = run_iter.peek() {
892 let next_size = max_part_bytes(next_parts, cfg);
893 if current_chunk_max_memory_usage + next_size > run_reserved_memory_bytes {
894 if current_chunk_runs.len() == 1 {
896 metrics.compaction.memory_violations.inc();
897 continue;
898 }
899 chunks.push((
901 CompactionInput::PartialBatch(
902 spine_id,
903 mem::take(&mut current_chunk_run_ids),
904 ),
905 desc_range([batch.desc.clone()], since.clone()),
906 std::mem::take(&mut current_chunk_runs),
907 current_chunk_max_memory_usage,
908 ));
909 current_chunk_max_memory_usage = 0;
910 }
911 }
912 }
913
914 if !current_chunk_runs.is_empty() {
915 chunks.push((
916 CompactionInput::PartialBatch(spine_id, mem::take(&mut current_chunk_run_ids)),
917 desc_range([batch.desc.clone()], since.clone()),
918 std::mem::take(&mut current_chunk_runs),
919 current_chunk_max_memory_usage,
920 ));
921 current_chunk_max_memory_usage = 0;
922 }
923 }
924
925 if !current_chunk_ids.is_empty() {
927 chunks.push((
928 input_id_range(current_chunk_ids),
929 desc_range(current_chunk_descs, since.clone()),
930 current_chunk_runs,
931 current_chunk_max_memory_usage,
932 ));
933 }
934
935 chunks
936 }
937
938 pub(crate) async fn compact_runs(
942 cfg: &CompactConfig,
943 shard_id: &ShardId,
944 desc: &Description<T>,
945 runs: Vec<(&Description<T>, &RunMeta, &[RunPart<T>])>,
946 blob: Arc<dyn Blob>,
947 metrics: Arc<Metrics>,
948 shard_metrics: Arc<ShardMetrics>,
949 isolated_runtime: Arc<IsolatedRuntime>,
950 write_schemas: Schemas<K, V>,
951 ) -> Result<HollowBatch<T>, anyhow::Error> {
952 let prefetch_budget_bytes = 2 * cfg.batch.blob_target_size;
960
961 let mut timings = Timings::default();
962
963 let mut batch_cfg = cfg.batch.clone();
964
965 batch_cfg.inline_writes_single_max_bytes = 0;
970
971 let parts = BatchParts::new_ordered::<D>(
972 batch_cfg,
973 cfg.batch.preferred_order,
974 Arc::clone(&metrics),
975 Arc::clone(&shard_metrics),
976 *shard_id,
977 Arc::clone(&blob),
978 Arc::clone(&isolated_runtime),
979 &metrics.compaction.batch,
980 );
981 let mut batch = BatchBuilderInternal::<K, V, T, D>::new(
982 cfg.batch.clone(),
983 parts,
984 Arc::clone(&metrics),
985 write_schemas.clone(),
986 Arc::clone(&blob),
987 shard_id.clone(),
988 cfg.version.clone(),
989 );
990
991 let mut consolidator = Consolidator::new(
992 format!(
993 "{}[lower={:?},upper={:?}]",
994 shard_id,
995 desc.lower().elements(),
996 desc.upper().elements()
997 ),
998 cfg.fetch_config.clone(),
999 *shard_id,
1000 StructuredSort::<K, V, T, D>::new(write_schemas.clone()),
1001 blob,
1002 Arc::clone(&metrics),
1003 shard_metrics,
1004 metrics.read.compaction.clone(),
1005 FetchBatchFilter::Compaction {
1006 since: desc.since().clone(),
1007 },
1008 None,
1009 prefetch_budget_bytes,
1010 );
1011
1012 for (desc, meta, parts) in runs {
1013 consolidator.enqueue_run(desc, meta, parts.iter().cloned());
1014 }
1015
1016 let remaining_budget = consolidator.start_prefetches();
1017 if remaining_budget.is_none() {
1018 metrics.compaction.not_all_prefetched.inc();
1019 }
1020
1021 loop {
1022 let mut chunks = vec![];
1023 let mut total_bytes = 0;
1024 while total_bytes < cfg.batch.blob_target_size {
1029 let fetch_start = Instant::now();
1030 let Some(chunk) = consolidator
1031 .next_chunk(
1032 cfg.compaction_yield_after_n_updates,
1033 cfg.batch.blob_target_size - total_bytes,
1034 )
1035 .await?
1036 else {
1037 break;
1038 };
1039 timings.part_fetching += fetch_start.elapsed();
1040 total_bytes += chunk.goodbytes();
1041 chunks.push(chunk);
1042 tokio::task::yield_now().await;
1043 }
1044
1045 let Some(updates) = Part::concat(&chunks).expect("compaction produces well-typed data")
1047 else {
1048 break;
1049 };
1050 batch.flush_part(desc.clone(), updates).await;
1051 }
1052 let mut batch = batch.finish(desc.clone()).await?;
1053
1054 let has_inline_parts = batch.batch.parts.iter().any(|x| x.is_inline());
1060 if has_inline_parts {
1061 error!(%shard_id, ?cfg, "compaction result unexpectedly had inline writes");
1062 let () = batch
1063 .flush_to_blob(
1064 &cfg.batch,
1065 &metrics.compaction.batch,
1066 &isolated_runtime,
1067 &write_schemas,
1068 )
1069 .await;
1070 }
1071
1072 timings.record(&metrics);
1073 Ok(batch.into_hollow_batch())
1074 }
1075
1076 fn validate_req(req: &CompactReq<T>) -> Result<(), anyhow::Error> {
1077 let mut frontier = req.desc.lower();
1078 for input in req.inputs.iter() {
1079 if PartialOrder::less_than(req.desc.since(), input.batch.desc.since()) {
1080 return Err(anyhow!(
1081 "output since {:?} must be at or in advance of input since {:?}",
1082 req.desc.since(),
1083 input.batch.desc.since()
1084 ));
1085 }
1086 if frontier != input.batch.desc.lower() {
1087 return Err(anyhow!(
1088 "invalid merge of non-consecutive batches {:?} vs {:?}",
1089 frontier,
1090 input.batch.desc.lower()
1091 ));
1092 }
1093 frontier = input.batch.desc.upper();
1094 }
1095 if frontier != req.desc.upper() {
1096 return Err(anyhow!(
1097 "invalid merge of non-consecutive batches {:?} vs {:?}",
1098 frontier,
1099 req.desc.upper()
1100 ));
1101 }
1102 Ok(())
1103 }
1104}
1105
1106#[derive(Debug, Default)]
1107struct Timings {
1108 part_fetching: Duration,
1109 heap_population: Duration,
1110}
1111
1112impl Timings {
1113 fn record(self, metrics: &Metrics) {
1114 let Timings {
1116 part_fetching,
1117 heap_population,
1118 } = self;
1119
1120 metrics
1121 .compaction
1122 .steps
1123 .part_fetch_seconds
1124 .inc_by(part_fetching.as_secs_f64());
1125 metrics
1126 .compaction
1127 .steps
1128 .heap_population_seconds
1129 .inc_by(heap_population.as_secs_f64());
1130 }
1131}
1132
1133#[cfg(test)]
1134mod tests {
1135 use mz_dyncfg::ConfigUpdates;
1136 use mz_ore::{assert_contains, assert_err};
1137 use mz_persist_types::codec_impls::StringSchema;
1138 use timely::progress::Antichain;
1139
1140 use crate::PersistLocation;
1141 use crate::batch::BLOB_TARGET_SIZE;
1142 use crate::internal::trace::SpineId;
1143 use crate::tests::{all_ok, expect_fetch_part, new_test_client_cache};
1144
1145 use super::*;
1146
1147 #[mz_persist_proc::test(tokio::test)]
1151 #[cfg_attr(miri, ignore)] async fn regression_minimum_since(dyncfgs: ConfigUpdates) {
1153 let data = vec![
1154 (("0".to_owned(), "zero".to_owned()), 0, 1),
1155 (("0".to_owned(), "zero".to_owned()), 1, -1),
1156 (("1".to_owned(), "one".to_owned()), 1, 1),
1157 ];
1158
1159 let cache = new_test_client_cache(&dyncfgs);
1160 cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1161 let (mut write, _) = cache
1162 .open(PersistLocation::new_in_mem())
1163 .await
1164 .expect("client construction failed")
1165 .expect_open::<String, String, u64, i64>(ShardId::new())
1166 .await;
1167 let b0 = write
1168 .expect_batch(&data[..1], 0, 1)
1169 .await
1170 .into_hollow_batch();
1171 let b1 = write
1172 .expect_batch(&data[1..], 1, 2)
1173 .await
1174 .into_hollow_batch();
1175
1176 let req = CompactReq {
1177 shard_id: write.machine.shard_id(),
1178 desc: Description::new(
1179 b0.desc.lower().clone(),
1180 b1.desc.upper().clone(),
1181 Antichain::from_elem(10u64),
1182 ),
1183 inputs: vec![
1184 IdHollowBatch {
1185 batch: Arc::new(b0),
1186 id: SpineId(0, 1),
1187 },
1188 IdHollowBatch {
1189 batch: Arc::new(b1),
1190 id: SpineId(1, 2),
1191 },
1192 ],
1193 };
1194 let schemas = Schemas {
1195 id: None,
1196 key: Arc::new(StringSchema),
1197 val: Arc::new(StringSchema),
1198 };
1199 let res = Compactor::<String, String, u64, i64>::compact(
1200 CompactConfig::new(&write.cfg, write.shard_id()),
1201 Arc::clone(&write.blob),
1202 Arc::clone(&write.metrics),
1203 write.metrics.shards.shard(&write.machine.shard_id(), ""),
1204 Arc::new(IsolatedRuntime::new_for_tests()),
1205 req.clone(),
1206 schemas.clone(),
1207 )
1208 .await
1209 .expect("compaction failed");
1210
1211 assert_eq!(res.output.desc, req.desc);
1212 assert_eq!(res.output.len, 1);
1213 assert_eq!(res.output.part_count(), 1);
1214 let part = res.output.parts[0].expect_hollow_part();
1215 let (part, updates) = expect_fetch_part(
1216 write.blob.as_ref(),
1217 &part.key.complete(&write.machine.shard_id()),
1218 &write.metrics,
1219 &schemas,
1220 )
1221 .await;
1222 assert_eq!(part.desc, res.output.desc);
1223 assert_eq!(updates, all_ok(&data, 10));
1224 }
1225
1226 #[mz_persist_proc::test(tokio::test)]
1227 #[cfg_attr(miri, ignore)] async fn disable_compaction(dyncfgs: ConfigUpdates) {
1229 let data = [
1230 (("0".to_owned(), "zero".to_owned()), 0, 1),
1231 (("0".to_owned(), "zero".to_owned()), 1, -1),
1232 (("1".to_owned(), "one".to_owned()), 1, 1),
1233 ];
1234
1235 let cache = new_test_client_cache(&dyncfgs);
1236 cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1237 let (mut write, _) = cache
1238 .open(PersistLocation::new_in_mem())
1239 .await
1240 .expect("client construction failed")
1241 .expect_open::<String, String, u64, i64>(ShardId::new())
1242 .await;
1243 let b0 = write
1244 .expect_batch(&data[..1], 0, 1)
1245 .await
1246 .into_hollow_batch();
1247 let b1 = write
1248 .expect_batch(&data[1..], 1, 2)
1249 .await
1250 .into_hollow_batch();
1251
1252 let req = CompactReq {
1253 shard_id: write.machine.shard_id(),
1254 desc: Description::new(
1255 b0.desc.lower().clone(),
1256 b1.desc.upper().clone(),
1257 Antichain::from_elem(10u64),
1258 ),
1259 inputs: vec![
1260 IdHollowBatch {
1261 batch: Arc::new(b0),
1262 id: SpineId(0, 1),
1263 },
1264 IdHollowBatch {
1265 batch: Arc::new(b1),
1266 id: SpineId(1, 2),
1267 },
1268 ],
1269 };
1270 write.cfg.set_config(&COMPACTION_HEURISTIC_MIN_INPUTS, 1);
1271 let compactor = write.compact.as_ref().expect("compaction hard disabled");
1272
1273 write.cfg.disable_compaction();
1274 let result = compactor
1275 .compact_and_apply_background(req.clone(), &write.machine)
1276 .expect("listener")
1277 .await
1278 .expect("channel closed");
1279 assert_err!(result);
1280 assert_contains!(result.unwrap_err().to_string(), "compaction disabled");
1281
1282 write.cfg.enable_compaction();
1283 compactor
1284 .compact_and_apply_background(req, &write.machine)
1285 .expect("listener")
1286 .await
1287 .expect("channel closed")
1288 .expect("compaction success");
1289
1290 let data2 = [
1292 (("2".to_owned(), "two".to_owned()), 2, 1),
1293 (("2".to_owned(), "two".to_owned()), 3, -1),
1294 (("3".to_owned(), "three".to_owned()), 3, 1),
1295 ];
1296
1297 let b2 = write
1298 .expect_batch(&data2[..1], 2, 3)
1299 .await
1300 .into_hollow_batch();
1301 let b3 = write
1302 .expect_batch(&data2[1..], 3, 4)
1303 .await
1304 .into_hollow_batch();
1305
1306 let req = CompactReq {
1307 shard_id: write.machine.shard_id(),
1308 desc: Description::new(
1309 b2.desc.lower().clone(),
1310 b3.desc.upper().clone(),
1311 Antichain::from_elem(20u64),
1312 ),
1313 inputs: vec![
1314 IdHollowBatch {
1315 batch: Arc::new(b2),
1316 id: SpineId(0, 1),
1317 },
1318 IdHollowBatch {
1319 batch: Arc::new(b3),
1320 id: SpineId(1, 2),
1321 },
1322 ],
1323 };
1324 let compactor = write.compact.as_ref().expect("compaction hard disabled");
1325
1326 write.cfg.set_config(&COMPACTION_CHECK_PROCESS_FLAG, false);
1328 write.cfg.disable_compaction();
1329 compactor
1331 .compact_and_apply_background(req, &write.machine)
1332 .expect("listener")
1333 .await
1334 .expect("channel closed")
1335 .expect("compaction success");
1336 }
1337}