1use std::collections::BTreeSet;
11use std::fmt::Debug;
12use std::marker::PhantomData;
13use std::mem;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use differential_dataflow::difference::Monoid;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::Description;
21use futures::{Stream, pin_mut};
22use futures_util::StreamExt;
23use itertools::Either;
24use mz_dyncfg::Config;
25use mz_ore::cast::CastFrom;
26use mz_ore::error::ErrorExt;
27use mz_ore::now::NowFn;
28use mz_ore::soft_assert_or_log;
29use mz_persist::location::Blob;
30use mz_persist_types::part::Part;
31use mz_persist_types::{Codec, Codec64};
32use timely::PartialOrder;
33use timely::progress::{Antichain, Timestamp};
34use tokio::sync::mpsc::Sender;
35use tokio::sync::{TryAcquireError, mpsc, oneshot};
36use tracing::{Instrument, Span, debug, debug_span, error, trace, warn};
37
38use crate::async_runtime::IsolatedRuntime;
39use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, BatchParts, PartDeletes};
40use crate::cfg::{
41 COMPACTION_HEURISTIC_MIN_INPUTS, COMPACTION_HEURISTIC_MIN_PARTS,
42 COMPACTION_HEURISTIC_MIN_UPDATES, COMPACTION_MEMORY_BOUND_BYTES,
43 GC_BLOB_DELETE_CONCURRENCY_LIMIT, MiB,
44};
45use crate::fetch::{FetchBatchFilter, FetchConfig};
46use crate::internal::encoding::Schemas;
47use crate::internal::gc::GarbageCollector;
48use crate::internal::machine::Machine;
49use crate::internal::maintenance::RoutineMaintenance;
50use crate::internal::metrics::ShardMetrics;
51use crate::internal::state::{HollowBatch, RunMeta, RunOrder, RunPart};
52use crate::internal::trace::{
53 ActiveCompaction, ApplyMergeResult, CompactionInput, FueledMergeRes, IdHollowBatch, SpineId,
54 id_range,
55};
56use crate::iter::{Consolidator, StructuredSort};
57use crate::{Metrics, PersistConfig, ShardId};
58
59#[derive(Debug, Clone)]
65pub struct CompactReq<T> {
66 pub shard_id: ShardId,
68 pub desc: Description<T>,
70 pub inputs: Vec<IdHollowBatch<T>>,
73}
74
75#[derive(Debug)]
77pub struct CompactRes<T> {
78 pub output: HollowBatch<T>,
80 pub input: CompactionInput,
82}
83
84#[derive(Debug, Clone)]
87pub struct CompactConfig {
88 pub(crate) compaction_memory_bound_bytes: usize,
89 pub(crate) compaction_yield_after_n_updates: usize,
90 pub(crate) version: semver::Version,
91 pub(crate) batch: BatchBuilderConfig,
92 pub(crate) fetch_config: FetchConfig,
93 pub(crate) now: NowFn,
94}
95
96impl CompactConfig {
97 pub fn new(value: &PersistConfig, shard_id: ShardId) -> Self {
99 CompactConfig {
100 compaction_memory_bound_bytes: COMPACTION_MEMORY_BOUND_BYTES.get(value),
101 compaction_yield_after_n_updates: value.compaction_yield_after_n_updates,
102 version: value.build_version.clone(),
103 batch: BatchBuilderConfig::new(value, shard_id),
104 fetch_config: FetchConfig::from_persist_config(value),
105 now: value.now.clone(),
106 }
107 }
108}
109
110#[derive(Debug)]
116pub struct Compactor<K, V, T, D> {
117 cfg: PersistConfig,
118 metrics: Arc<Metrics>,
119 sender: Sender<(
120 Instant,
121 CompactReq<T>,
122 Machine<K, V, T, D>,
123 oneshot::Sender<Result<(), anyhow::Error>>,
124 )>,
125 _phantom: PhantomData<fn() -> D>,
126}
127
128impl<K, V, T, D> Clone for Compactor<K, V, T, D> {
129 fn clone(&self) -> Self {
130 Compactor {
131 cfg: self.cfg.clone(),
132 metrics: Arc::clone(&self.metrics),
133 sender: self.sender.clone(),
134 _phantom: Default::default(),
135 }
136 }
137}
138
139pub(crate) const COMPACTION_MINIMUM_TIMEOUT: Config<Duration> = Config::new(
143 "persist_compaction_minimum_timeout",
144 Duration::from_secs(90),
145 "\
146 The minimum amount of time to allow a persist compaction request to run \
147 before timing it out (Materialize).",
148);
149
150pub(crate) const COMPACTION_CHECK_PROCESS_FLAG: Config<bool> = Config::new(
151 "persist_compaction_check_process_flag",
152 true,
153 "Whether Compactor will obey the process_requests flag in PersistConfig, \
154 which allows dynamically disabling compaction. If false, all compaction requests will be processed.",
155);
156
157fn input_id_range(ids: BTreeSet<SpineId>) -> CompactionInput {
159 let id = id_range(ids);
160
161 CompactionInput::IdRange(id)
162}
163
164impl<K, V, T, D> Compactor<K, V, T, D>
165where
166 K: Debug + Codec,
167 V: Debug + Codec,
168 T: Timestamp + Lattice + Codec64 + Sync,
169 D: Monoid + Ord + Codec64 + Send + Sync,
170{
171 pub fn new(
172 cfg: PersistConfig,
173 metrics: Arc<Metrics>,
174 gc: GarbageCollector<K, V, T, D>,
175 ) -> Self {
176 let (compact_req_sender, mut compact_req_receiver) = mpsc::channel::<(
177 Instant,
178 CompactReq<T>,
179 Machine<K, V, T, D>,
180 oneshot::Sender<Result<(), anyhow::Error>>,
181 )>(cfg.compaction_queue_size);
182 let concurrency_limit = Arc::new(tokio::sync::Semaphore::new(
183 cfg.compaction_concurrency_limit,
184 ));
185 let check_process_requests = COMPACTION_CHECK_PROCESS_FLAG.handle(&cfg.configs);
186 let process_requests = Arc::clone(&cfg.compaction_process_requests);
187
188 let _worker_handle = mz_ore::task::spawn(|| "PersistCompactionScheduler", async move {
191 while let Some((enqueued, req, machine, completer)) = compact_req_receiver.recv().await
192 {
193 assert_eq!(req.shard_id, machine.shard_id());
194 let metrics = Arc::clone(&machine.applier.metrics);
195
196 if check_process_requests.get()
198 && !process_requests.load(std::sync::atomic::Ordering::Relaxed)
199 {
200 let _ = completer.send(Err(anyhow::anyhow!("compaction disabled")));
203 metrics.compaction.disabled.inc();
204 tracing::warn!(shard_id = ?req.shard_id, "Dropping compaction request on the floor.");
205
206 continue;
207 }
208
209 let permit = {
210 let inner = Arc::clone(&concurrency_limit);
211 match inner.try_acquire_owned() {
214 Ok(permit) => permit,
215 Err(TryAcquireError::NoPermits) => {
216 metrics.compaction.concurrency_waits.inc();
217 Arc::clone(&concurrency_limit)
218 .acquire_owned()
219 .await
220 .expect("semaphore is never closed")
221 }
222 Err(TryAcquireError::Closed) => {
223 warn!("semaphore for shard {} is closed", machine.shard_id());
226 continue;
227 }
228 }
229 };
230 metrics
231 .compaction
232 .queued_seconds
233 .inc_by(enqueued.elapsed().as_secs_f64());
234
235 let compact_span =
236 debug_span!(parent: None, "compact::apply", shard_id=%machine.shard_id());
237 compact_span.follows_from(&Span::current());
238 let gc = gc.clone();
239 mz_ore::task::spawn(|| "PersistCompactionWorker", async move {
240 let res = Self::compact_and_apply(&machine, req)
241 .instrument(compact_span)
242 .await;
243
244 match res {
245 Ok(maintenance) => maintenance.start_performing(&machine, &gc),
246 Err(err) => {
247 debug!(shard_id =? machine.shard_id(), "compaction failed: {err:#}")
248 }
249 }
250
251 let _ = completer.send(Ok(()));
254
255 drop(permit);
257 });
258 }
259 });
260
261 Compactor {
262 cfg,
263 metrics,
264 sender: compact_req_sender,
265 _phantom: PhantomData,
266 }
267 }
268
269 pub fn compact_and_apply_background(
274 &self,
275 req: CompactReq<T>,
276 machine: &Machine<K, V, T, D>,
277 ) -> Option<oneshot::Receiver<Result<(), anyhow::Error>>> {
278 let should_compact = req.inputs.len() >= COMPACTION_HEURISTIC_MIN_INPUTS.get(&self.cfg)
284 || req
285 .inputs
286 .iter()
287 .map(|x| x.batch.part_count())
288 .sum::<usize>()
289 >= COMPACTION_HEURISTIC_MIN_PARTS.get(&self.cfg)
290 || req.inputs.iter().map(|x| x.batch.len).sum::<usize>()
291 >= COMPACTION_HEURISTIC_MIN_UPDATES.get(&self.cfg);
292 if !should_compact {
293 self.metrics.compaction.skipped.inc();
294 return None;
295 }
296
297 let (compaction_completed_sender, compaction_completed_receiver) = oneshot::channel();
298 let new_compaction_sender = self.sender.clone();
299
300 self.metrics.compaction.requested.inc();
301 let send = new_compaction_sender.try_send((
307 Instant::now(),
308 req,
309 machine.clone(),
310 compaction_completed_sender,
311 ));
312 if let Err(_) = send {
313 self.metrics.compaction.dropped.inc();
314 return None;
315 }
316
317 Some(compaction_completed_receiver)
318 }
319
320 pub(crate) async fn compact_and_apply(
321 machine: &Machine<K, V, T, D>,
322 req: CompactReq<T>,
323 ) -> Result<RoutineMaintenance, anyhow::Error> {
324 let metrics = Arc::clone(&machine.applier.metrics);
325 metrics.compaction.started.inc();
326 let start = Instant::now();
327
328 let total_input_bytes = req
331 .inputs
332 .iter()
333 .map(|batch| batch.batch.encoded_size_bytes())
334 .sum::<usize>();
335 let timeout = Duration::max(
336 COMPACTION_MINIMUM_TIMEOUT.get(&machine.applier.cfg),
338 Duration::from_secs(u64::cast_from(total_input_bytes / MiB)),
340 );
341 let Some(compaction_schema_id) = req
344 .inputs
345 .iter()
346 .flat_map(|batch| batch.batch.run_meta.iter())
347 .filter_map(|run_meta| run_meta.schema)
348 .max()
350 else {
351 metrics.compaction.schema_selection.no_schema.inc();
352 metrics.compaction.failed.inc();
353 return Err(anyhow!(
354 "compacting {shard_id} and spine ids {spine_ids}: could not determine schema id from inputs",
355 shard_id = req.shard_id,
356 spine_ids = mz_ore::str::separated(", ", req.inputs.iter().map(|i| i.id))
357 ));
358 };
359 let Some((key_schema, val_schema)) = machine.get_schema(compaction_schema_id) else {
360 metrics.compaction.schema_selection.no_schema.inc();
361 metrics.compaction.failed.inc();
362 return Err(anyhow!(
363 "compacting {shard_id} and spine ids {spine_ids}: schema id {compaction_schema_id} not present in machine state",
364 shard_id = req.shard_id,
365 spine_ids = mz_ore::str::separated(", ", req.inputs.iter().map(|i| i.id))
366 ));
367 };
368
369 metrics.compaction.schema_selection.recent_schema.inc();
370
371 let compaction_schema = Schemas {
372 id: Some(compaction_schema_id),
373 key: Arc::new(key_schema),
374 val: Arc::new(val_schema),
375 };
376
377 trace!(
378 "compaction request for {}MBs ({} bytes), with timeout of {}s, and schema {:?}.",
379 total_input_bytes / MiB,
380 total_input_bytes,
381 timeout.as_secs_f64(),
382 compaction_schema.id,
383 );
384
385 let isolated_runtime = Arc::clone(&machine.isolated_runtime);
386 let machine_clone = machine.clone();
387 let metrics_clone = Arc::clone(&machine.applier.metrics);
388 let compact_span = debug_span!("compact::consolidate");
389 let res = tokio::time::timeout(
390 timeout,
391 isolated_runtime.spawn_named(
393 || "persist::compact::consolidate",
394 async move {
395 let all_runs_have_uuids = req
399 .inputs
400 .iter()
401 .all(|x| x.batch.runs().all(|(meta, _)| meta.id.is_some()));
402 let all_runs_have_len = req
403 .inputs
404 .iter()
405 .all(|x| x.batch.runs().all(|(meta, _)| meta.len.is_some()));
406
407 let compact_cfg =
408 CompactConfig::new(&machine_clone.applier.cfg, machine_clone.shard_id());
409 let incremental_enabled = compact_cfg.batch.enable_incremental_compaction
410 && all_runs_have_uuids
411 && all_runs_have_len;
412 let stream = Self::compact_stream(
413 compact_cfg,
414 Arc::clone(&machine_clone.applier.state_versions.blob),
415 Arc::clone(&metrics_clone),
416 Arc::clone(&machine_clone.applier.shard_metrics),
417 Arc::clone(&machine_clone.isolated_runtime),
418 req.clone(),
419 compaction_schema,
420 incremental_enabled,
421 );
422
423 let maintenance = if incremental_enabled {
424 let mut maintenance = RoutineMaintenance::default();
425 pin_mut!(stream);
426 while let Some(res) = stream.next().await {
427 let res = res?;
428 let new_maintenance =
429 Self::apply(res, &metrics_clone, &machine_clone).await?;
430 maintenance.merge(new_maintenance);
431 }
432 maintenance
433 } else {
434 let res = Self::compact_all(stream, req.clone()).await?;
435 Self::apply(
436 FueledMergeRes {
437 output: res.output,
438 input: CompactionInput::Legacy,
439 new_active_compaction: None,
440 },
441 &metrics_clone,
442 &machine_clone,
443 )
444 .await?
445 };
446
447 Ok::<_, anyhow::Error>(maintenance)
448 }
449 .instrument(compact_span),
450 ),
451 )
452 .await;
453
454 metrics
455 .compaction
456 .seconds
457 .inc_by(start.elapsed().as_secs_f64());
458 let res = res.map_err(|e| {
459 metrics.compaction.timed_out.inc();
460 anyhow!(
461 "compaction timed out after {}s: {}",
462 timeout.as_secs_f64(),
463 e
464 )
465 })?;
466
467 match res {
468 Ok(maintenance) => Ok(maintenance),
469 Err(err) => {
470 metrics.compaction.failed.inc();
471 debug!(
472 "compaction for {} failed: {}",
473 machine.shard_id(),
474 err.display_with_causes()
475 );
476 Err(err)
477 }
478 }
479 }
480
481 pub async fn compact_all(
482 stream: impl Stream<Item = Result<FueledMergeRes<T>, anyhow::Error>>,
483 req: CompactReq<T>,
484 ) -> Result<CompactRes<T>, anyhow::Error> {
485 pin_mut!(stream);
486
487 let mut all_parts = vec![];
488 let mut all_run_splits = vec![];
489 let mut all_run_meta = vec![];
490 let mut len = 0;
491
492 while let Some(res) = stream.next().await {
493 let res = res?.output;
494 let (parts, updates, run_meta, run_splits) =
495 (res.parts, res.len, res.run_meta, res.run_splits);
496
497 if updates == 0 {
498 continue;
499 }
500
501 let run_offset = all_parts.len();
502 if !all_parts.is_empty() {
503 all_run_splits.push(run_offset);
504 }
505 all_run_splits.extend(run_splits.iter().map(|r| r + run_offset));
506 all_run_meta.extend(run_meta);
507 all_parts.extend(parts);
508 len += updates;
509 }
510
511 let batches = req.inputs.iter().map(|x| x.id).collect::<BTreeSet<_>>();
512 let input = input_id_range(batches);
513
514 Ok(CompactRes {
515 output: HollowBatch::new(
516 req.desc.clone(),
517 all_parts,
518 len,
519 all_run_meta,
520 all_run_splits,
521 ),
522 input,
523 })
524 }
525
526 pub async fn apply(
527 res: FueledMergeRes<T>,
528 metrics: &Metrics,
529 machine: &Machine<K, V, T, D>,
530 ) -> Result<RoutineMaintenance, anyhow::Error> {
531 let (apply_merge_result, maintenance) = machine.merge_res(&res).await;
532
533 match &apply_merge_result {
534 ApplyMergeResult::AppliedExact => {
535 metrics.compaction.applied.inc();
536 metrics.compaction.applied_exact_match.inc();
537 machine.applier.shard_metrics.compaction_applied.inc();
538 }
539 ApplyMergeResult::AppliedSubset => {
540 metrics.compaction.applied.inc();
541 metrics.compaction.applied_subset_match.inc();
542 machine.applier.shard_metrics.compaction_applied.inc();
543 }
544 ApplyMergeResult::NotAppliedNoMatch
545 | ApplyMergeResult::NotAppliedInvalidSince
546 | ApplyMergeResult::NotAppliedTooManyUpdates => {
547 if let ApplyMergeResult::NotAppliedTooManyUpdates = &apply_merge_result {
548 metrics.compaction.not_applied_too_many_updates.inc();
549 }
550 metrics.compaction.noop.inc();
551 let mut part_deletes = PartDeletes::default();
552 for part in &res.output.parts {
553 part_deletes.add(part);
554 }
555 part_deletes
556 .delete(
557 machine.applier.state_versions.blob.as_ref(),
558 machine.shard_id(),
559 GC_BLOB_DELETE_CONCURRENCY_LIMIT.get(&machine.applier.cfg),
560 &*metrics,
561 &metrics.retries.external.compaction_noop_delete,
562 )
563 .await;
564 }
565 };
566
567 Ok(maintenance)
568 }
569
570 pub fn compact_stream(
594 cfg: CompactConfig,
595 blob: Arc<dyn Blob>,
596 metrics: Arc<Metrics>,
597 shard_metrics: Arc<ShardMetrics>,
598 isolated_runtime: Arc<IsolatedRuntime>,
599 req: CompactReq<T>,
600 write_schemas: Schemas<K, V>,
601 incremental_enabled: bool,
602 ) -> impl Stream<Item = Result<FueledMergeRes<T>, anyhow::Error>> {
603 async_stream::stream! {
604 let () = Self::validate_req(&req)?;
605
606 let mut single_nonempty_batch = None;
611 for batch in &req.inputs {
612 if batch.batch.len > 0 {
613 match single_nonempty_batch {
614 None => single_nonempty_batch = Some(batch),
615 Some(_previous_nonempty_batch) => {
616 single_nonempty_batch = None;
617 break;
618 }
619 }
620 }
621 }
622 if let Some(single_nonempty_batch) = single_nonempty_batch {
623 if single_nonempty_batch.batch.run_splits.len() == 0
624 && single_nonempty_batch.batch.desc.since()
625 != &Antichain::from_elem(T::minimum())
626 {
627 metrics.compaction.fast_path_eligible.inc();
628 }
629 }
630
631 let in_progress_part_reserved_memory_bytes = 2 * cfg.batch.blob_target_size;
633 let run_reserved_memory_bytes = cfg
638 .compaction_memory_bound_bytes
639 .saturating_sub(in_progress_part_reserved_memory_bytes);
640
641 let chunked_runs = Self::chunk_runs(
642 &req,
643 &cfg,
644 &*metrics,
645 run_reserved_memory_bytes,
646 req.desc.since()
647 );
648 let total_chunked_runs = chunked_runs.len();
649
650 let parts_before = req.inputs.iter()
651 .map(|x| x.batch.parts.len()).sum::<usize>();
652 let parts_after = chunked_runs.iter()
653 .flat_map(|(_, _, runs, _)| {
654 runs.iter().map(|(_, _, parts)| parts.len())
655 })
656 .sum::<usize>();
657 assert_eq!(
658 parts_before, parts_after,
659 "chunking should not change the number of parts",
660 );
661
662 for (applied, (input, desc, runs, run_chunk_max_memory_usage)) in
663 chunked_runs.into_iter().enumerate()
664 {
665 metrics.compaction.chunks_compacted.inc();
666 metrics
667 .compaction
668 .runs_compacted
669 .inc_by(u64::cast_from(runs.len()));
670
671 let extra_outstanding_parts = (run_reserved_memory_bytes
675 .saturating_sub(run_chunk_max_memory_usage))
676 / cfg.batch.blob_target_size;
677 let mut run_cfg = cfg.clone();
678 run_cfg.batch.batch_builder_max_outstanding_parts = 1 + extra_outstanding_parts;
679
680 let desc = if incremental_enabled {
681 desc
682 } else {
683 req.desc.clone()
684 };
685
686 let runs = runs.iter()
687 .map(|(desc, meta, run)| (*desc, *meta, *run))
688 .collect::<Vec<_>>();
689
690 let batch = Self::compact_runs(
691 &run_cfg,
692 &req.shard_id,
693 &desc,
694 runs,
695 Arc::clone(&blob),
696 Arc::clone(&metrics),
697 Arc::clone(&shard_metrics),
698 Arc::clone(&isolated_runtime),
699 write_schemas.clone(),
700 )
701 .await?;
702
703 assert!(
704 (batch.len == 0 && batch.parts.len() == 0)
705 || (batch.len > 0 && batch.parts.len() > 0),
706 "updates={}, parts={}",
707 batch.len,
708 batch.parts.len(),
709 );
710
711 let clock = cfg.now.clone();
713 let active_compaction = if applied < total_chunked_runs - 1 {
714 Some(ActiveCompaction { start_ms: clock() })
715 } else {
716 None
717 };
718
719 let res = CompactRes {
720 output: batch,
721 input,
722 };
723
724 let res = FueledMergeRes {
725 output: res.output,
726 new_active_compaction: active_compaction,
727 input: res.input,
728 };
729
730 yield Ok(res);
731 }
732 }
733 }
734
735 pub async fn compact(
740 cfg: CompactConfig,
741 blob: Arc<dyn Blob>,
742 metrics: Arc<Metrics>,
743 shard_metrics: Arc<ShardMetrics>,
744 isolated_runtime: Arc<IsolatedRuntime>,
745 req: CompactReq<T>,
746 write_schemas: Schemas<K, V>,
747 ) -> Result<CompactRes<T>, anyhow::Error> {
748 let stream = Self::compact_stream(
749 cfg,
750 Arc::clone(&blob),
751 Arc::clone(&metrics),
752 Arc::clone(&shard_metrics),
753 Arc::clone(&isolated_runtime),
754 req.clone(),
755 write_schemas,
756 false,
757 );
758
759 Self::compact_all(stream, req).await
760 }
761
762 fn chunk_runs<'a>(
767 req: &'a CompactReq<T>,
768 cfg: &CompactConfig,
769 metrics: &Metrics,
770 run_reserved_memory_bytes: usize,
771 since: &Antichain<T>,
772 ) -> Vec<(
773 CompactionInput,
774 Description<T>,
775 Vec<(&'a Description<T>, &'a RunMeta, &'a [RunPart<T>])>,
776 usize,
777 )> {
778 let _ = input_id_range(req.inputs.iter().map(|x| x.id).collect());
780
781 let mut batches: Vec<_> = req.inputs.iter().map(|x| (x.id, &*x.batch)).collect();
783 batches.sort_by_key(|(id, _)| *id);
784
785 let mut chunks = vec![];
786 let mut current_chunk_ids = BTreeSet::new();
787 let mut current_chunk_descs = Vec::new();
788 let mut current_chunk_runs = vec![];
789 let mut current_chunk_max_memory_usage = 0;
790
791 fn max_part_bytes<T>(parts: &[RunPart<T>], cfg: &CompactConfig) -> usize {
792 parts
793 .iter()
794 .map(|p| p.max_part_bytes())
795 .max()
796 .unwrap_or(cfg.batch.blob_target_size)
797 }
798
799 fn desc_range<T: Timestamp>(
800 descs: impl IntoIterator<Item = Description<T>>,
801 since: Antichain<T>,
802 ) -> Description<T> {
803 let mut descs = descs.into_iter();
804 let first = descs.next().expect("non-empty set of descriptions");
805 let lower = first.lower().clone();
806 let mut upper = first.upper().clone();
807 for desc in descs {
808 assert_eq!(&upper, desc.lower());
809 upper = desc.upper().clone();
810 }
811 let upper = upper.clone();
812 Description::new(lower, upper, since)
813 }
814
815 for (spine_id, batch) in batches {
816 let batch_size = batch
817 .runs()
818 .map(|(_, parts)| max_part_bytes(parts, cfg))
819 .sum::<usize>();
820
821 let num_runs = batch.run_meta.len();
822
823 let runs = batch.runs().flat_map(|(meta, parts)| {
824 if meta.order.unwrap_or(RunOrder::Codec) == cfg.batch.preferred_order {
825 Either::Left(std::iter::once((&batch.desc, meta, parts)))
826 } else {
827 soft_assert_or_log!(
837 !parts.iter().any(|r| matches!(r, RunPart::Many(_))),
838 "unexpected out-of-order hollow run"
839 );
840 Either::Right(
841 parts
842 .iter()
843 .map(move |p| (&batch.desc, meta, std::slice::from_ref(p))),
844 )
845 }
846 });
847
848 if current_chunk_max_memory_usage + batch_size <= run_reserved_memory_bytes
852 || current_chunk_runs.len() + num_runs <= 2
853 {
854 if current_chunk_max_memory_usage + batch_size > run_reserved_memory_bytes {
855 metrics.compaction.memory_violations.inc();
858 }
859 current_chunk_ids.insert(spine_id);
860 current_chunk_descs.push(batch.desc.clone());
861 current_chunk_runs.extend(runs);
862 current_chunk_max_memory_usage += batch_size;
863 continue;
864 }
865
866 if !current_chunk_ids.is_empty() {
868 chunks.push((
869 input_id_range(std::mem::take(&mut current_chunk_ids)),
870 desc_range(mem::take(&mut current_chunk_descs), since.clone()),
871 std::mem::take(&mut current_chunk_runs),
872 current_chunk_max_memory_usage,
873 ));
874 current_chunk_max_memory_usage = 0;
875 }
876
877 if batch_size <= run_reserved_memory_bytes {
879 current_chunk_ids.insert(spine_id);
880 current_chunk_descs.push(batch.desc.clone());
881 current_chunk_runs.extend(runs);
882 current_chunk_max_memory_usage += batch_size;
883 continue;
884 }
885
886 let mut run_iter = runs.into_iter().peekable();
889 debug_assert!(current_chunk_ids.is_empty());
890 debug_assert!(current_chunk_descs.is_empty());
891 debug_assert!(current_chunk_runs.is_empty());
892 debug_assert_eq!(current_chunk_max_memory_usage, 0);
893 let mut current_chunk_run_ids = BTreeSet::new();
894
895 while let Some((desc, meta, parts)) = run_iter.next() {
896 let run_size = max_part_bytes(parts, cfg);
897 current_chunk_runs.push((desc, meta, parts));
898 current_chunk_max_memory_usage += run_size;
899 current_chunk_run_ids.extend(meta.id);
900
901 if let Some((_, _meta, next_parts)) = run_iter.peek() {
902 let next_size = max_part_bytes(next_parts, cfg);
903 if current_chunk_max_memory_usage + next_size > run_reserved_memory_bytes {
904 if current_chunk_runs.len() == 1 {
906 metrics.compaction.memory_violations.inc();
907 continue;
908 }
909 chunks.push((
911 CompactionInput::PartialBatch(
912 spine_id,
913 mem::take(&mut current_chunk_run_ids),
914 ),
915 desc_range([batch.desc.clone()], since.clone()),
916 std::mem::take(&mut current_chunk_runs),
917 current_chunk_max_memory_usage,
918 ));
919 current_chunk_max_memory_usage = 0;
920 }
921 }
922 }
923
924 if !current_chunk_runs.is_empty() {
925 chunks.push((
926 CompactionInput::PartialBatch(spine_id, mem::take(&mut current_chunk_run_ids)),
927 desc_range([batch.desc.clone()], since.clone()),
928 std::mem::take(&mut current_chunk_runs),
929 current_chunk_max_memory_usage,
930 ));
931 current_chunk_max_memory_usage = 0;
932 }
933 }
934
935 if !current_chunk_ids.is_empty() {
937 chunks.push((
938 input_id_range(current_chunk_ids),
939 desc_range(current_chunk_descs, since.clone()),
940 current_chunk_runs,
941 current_chunk_max_memory_usage,
942 ));
943 }
944
945 chunks
946 }
947
948 pub(crate) async fn compact_runs(
952 cfg: &CompactConfig,
953 shard_id: &ShardId,
954 desc: &Description<T>,
955 runs: Vec<(&Description<T>, &RunMeta, &[RunPart<T>])>,
956 blob: Arc<dyn Blob>,
957 metrics: Arc<Metrics>,
958 shard_metrics: Arc<ShardMetrics>,
959 isolated_runtime: Arc<IsolatedRuntime>,
960 write_schemas: Schemas<K, V>,
961 ) -> Result<HollowBatch<T>, anyhow::Error> {
962 let prefetch_budget_bytes = 2 * cfg.batch.blob_target_size;
970
971 let mut timings = Timings::default();
972
973 let mut batch_cfg = cfg.batch.clone();
974
975 batch_cfg.inline_writes_single_max_bytes = 0;
980
981 let parts = BatchParts::new_ordered::<D>(
982 batch_cfg,
983 cfg.batch.preferred_order,
984 Arc::clone(&metrics),
985 Arc::clone(&shard_metrics),
986 *shard_id,
987 Arc::clone(&blob),
988 Arc::clone(&isolated_runtime),
989 &metrics.compaction.batch,
990 );
991 let mut batch = BatchBuilderInternal::<K, V, T, D>::new(
992 cfg.batch.clone(),
993 parts,
994 Arc::clone(&metrics),
995 write_schemas.clone(),
996 Arc::clone(&blob),
997 shard_id.clone(),
998 cfg.version.clone(),
999 );
1000
1001 let mut consolidator = Consolidator::new(
1002 format!(
1003 "{}[lower={:?},upper={:?}]",
1004 shard_id,
1005 desc.lower().elements(),
1006 desc.upper().elements()
1007 ),
1008 cfg.fetch_config.clone(),
1009 *shard_id,
1010 StructuredSort::<K, V, T, D>::new(write_schemas.clone()),
1011 blob,
1012 Arc::clone(&metrics),
1013 shard_metrics,
1014 metrics.read.compaction.clone(),
1015 FetchBatchFilter::Compaction {
1016 since: desc.since().clone(),
1017 },
1018 None,
1019 prefetch_budget_bytes,
1020 );
1021
1022 for (desc, meta, parts) in runs {
1023 consolidator.enqueue_run(desc, meta, parts.iter().cloned());
1024 }
1025
1026 let remaining_budget = consolidator.start_prefetches();
1027 if remaining_budget.is_none() {
1028 metrics.compaction.not_all_prefetched.inc();
1029 }
1030
1031 loop {
1032 let mut chunks = vec![];
1033 let mut total_bytes = 0;
1034 while total_bytes < cfg.batch.blob_target_size {
1039 let fetch_start = Instant::now();
1040 let Some(chunk) = consolidator
1041 .next_chunk(
1042 cfg.compaction_yield_after_n_updates,
1043 cfg.batch.blob_target_size - total_bytes,
1044 )
1045 .await?
1046 else {
1047 break;
1048 };
1049 timings.part_fetching += fetch_start.elapsed();
1050 total_bytes += chunk.goodbytes();
1051 chunks.push(chunk);
1052 tokio::task::yield_now().await;
1053 }
1054
1055 let Some(updates) = Part::concat(&chunks).expect("compaction produces well-typed data")
1057 else {
1058 break;
1059 };
1060 batch.flush_part(desc.clone(), updates).await;
1061 }
1062 let mut batch = batch.finish(desc.clone()).await?;
1063
1064 let has_inline_parts = batch.batch.parts.iter().any(|x| x.is_inline());
1070 if has_inline_parts {
1071 error!(%shard_id, ?cfg, "compaction result unexpectedly had inline writes");
1072 let () = batch
1073 .flush_to_blob(
1074 &cfg.batch,
1075 &metrics.compaction.batch,
1076 &isolated_runtime,
1077 &write_schemas,
1078 )
1079 .await;
1080 }
1081
1082 timings.record(&metrics);
1083 Ok(batch.into_hollow_batch())
1084 }
1085
1086 fn validate_req(req: &CompactReq<T>) -> Result<(), anyhow::Error> {
1087 let mut frontier = req.desc.lower();
1088 for input in req.inputs.iter() {
1089 if PartialOrder::less_than(req.desc.since(), input.batch.desc.since()) {
1090 return Err(anyhow!(
1091 "output since {:?} must be at or in advance of input since {:?}",
1092 req.desc.since(),
1093 input.batch.desc.since()
1094 ));
1095 }
1096 if frontier != input.batch.desc.lower() {
1097 return Err(anyhow!(
1098 "invalid merge of non-consecutive batches {:?} vs {:?}",
1099 frontier,
1100 input.batch.desc.lower()
1101 ));
1102 }
1103 frontier = input.batch.desc.upper();
1104 }
1105 if frontier != req.desc.upper() {
1106 return Err(anyhow!(
1107 "invalid merge of non-consecutive batches {:?} vs {:?}",
1108 frontier,
1109 req.desc.upper()
1110 ));
1111 }
1112 Ok(())
1113 }
1114}
1115
1116#[derive(Debug, Default)]
1117struct Timings {
1118 part_fetching: Duration,
1119 heap_population: Duration,
1120}
1121
1122impl Timings {
1123 fn record(self, metrics: &Metrics) {
1124 let Timings {
1126 part_fetching,
1127 heap_population,
1128 } = self;
1129
1130 metrics
1131 .compaction
1132 .steps
1133 .part_fetch_seconds
1134 .inc_by(part_fetching.as_secs_f64());
1135 metrics
1136 .compaction
1137 .steps
1138 .heap_population_seconds
1139 .inc_by(heap_population.as_secs_f64());
1140 }
1141}
1142
1143#[cfg(test)]
1144mod tests {
1145 use mz_dyncfg::ConfigUpdates;
1146 use mz_ore::{assert_contains, assert_err};
1147 use mz_persist_types::codec_impls::StringSchema;
1148 use timely::progress::Antichain;
1149
1150 use crate::PersistLocation;
1151 use crate::batch::BLOB_TARGET_SIZE;
1152 use crate::internal::trace::SpineId;
1153 use crate::tests::{all_ok, expect_fetch_part, new_test_client_cache};
1154
1155 use super::*;
1156
1157 #[mz_persist_proc::test(tokio::test)]
1161 #[cfg_attr(miri, ignore)] async fn regression_minimum_since(dyncfgs: ConfigUpdates) {
1163 let data = vec![
1164 (("0".to_owned(), "zero".to_owned()), 0, 1),
1165 (("0".to_owned(), "zero".to_owned()), 1, -1),
1166 (("1".to_owned(), "one".to_owned()), 1, 1),
1167 ];
1168
1169 let cache = new_test_client_cache(&dyncfgs);
1170 cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1171 let (mut write, _) = cache
1172 .open(PersistLocation::new_in_mem())
1173 .await
1174 .expect("client construction failed")
1175 .expect_open::<String, String, u64, i64>(ShardId::new())
1176 .await;
1177 let b0 = write
1178 .expect_batch(&data[..1], 0, 1)
1179 .await
1180 .into_hollow_batch();
1181 let b1 = write
1182 .expect_batch(&data[1..], 1, 2)
1183 .await
1184 .into_hollow_batch();
1185
1186 let req = CompactReq {
1187 shard_id: write.machine.shard_id(),
1188 desc: Description::new(
1189 b0.desc.lower().clone(),
1190 b1.desc.upper().clone(),
1191 Antichain::from_elem(10u64),
1192 ),
1193 inputs: vec![
1194 IdHollowBatch {
1195 batch: Arc::new(b0),
1196 id: SpineId(0, 1),
1197 },
1198 IdHollowBatch {
1199 batch: Arc::new(b1),
1200 id: SpineId(1, 2),
1201 },
1202 ],
1203 };
1204 let schemas = Schemas {
1205 id: None,
1206 key: Arc::new(StringSchema),
1207 val: Arc::new(StringSchema),
1208 };
1209 let res = Compactor::<String, String, u64, i64>::compact(
1210 CompactConfig::new(&write.cfg, write.shard_id()),
1211 Arc::clone(&write.blob),
1212 Arc::clone(&write.metrics),
1213 write.metrics.shards.shard(&write.machine.shard_id(), ""),
1214 Arc::new(IsolatedRuntime::new_for_tests()),
1215 req.clone(),
1216 schemas.clone(),
1217 )
1218 .await
1219 .expect("compaction failed");
1220
1221 assert_eq!(res.output.desc, req.desc);
1222 assert_eq!(res.output.len, 1);
1223 assert_eq!(res.output.part_count(), 1);
1224 let part = res.output.parts[0].expect_hollow_part();
1225 let (part, updates) = expect_fetch_part(
1226 write.blob.as_ref(),
1227 &part.key.complete(&write.machine.shard_id()),
1228 &write.metrics,
1229 &schemas,
1230 )
1231 .await;
1232 assert_eq!(part.desc, res.output.desc);
1233 assert_eq!(updates, all_ok(&data, 10));
1234 }
1235
1236 #[mz_persist_proc::test(tokio::test)]
1237 #[cfg_attr(miri, ignore)] async fn disable_compaction(dyncfgs: ConfigUpdates) {
1239 let data = [
1240 (("0".to_owned(), "zero".to_owned()), 0, 1),
1241 (("0".to_owned(), "zero".to_owned()), 1, -1),
1242 (("1".to_owned(), "one".to_owned()), 1, 1),
1243 ];
1244
1245 let cache = new_test_client_cache(&dyncfgs);
1246 cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1247 let (mut write, _) = cache
1248 .open(PersistLocation::new_in_mem())
1249 .await
1250 .expect("client construction failed")
1251 .expect_open::<String, String, u64, i64>(ShardId::new())
1252 .await;
1253 let b0 = write
1254 .expect_batch(&data[..1], 0, 1)
1255 .await
1256 .into_hollow_batch();
1257 let b1 = write
1258 .expect_batch(&data[1..], 1, 2)
1259 .await
1260 .into_hollow_batch();
1261
1262 let req = CompactReq {
1263 shard_id: write.machine.shard_id(),
1264 desc: Description::new(
1265 b0.desc.lower().clone(),
1266 b1.desc.upper().clone(),
1267 Antichain::from_elem(10u64),
1268 ),
1269 inputs: vec![
1270 IdHollowBatch {
1271 batch: Arc::new(b0),
1272 id: SpineId(0, 1),
1273 },
1274 IdHollowBatch {
1275 batch: Arc::new(b1),
1276 id: SpineId(1, 2),
1277 },
1278 ],
1279 };
1280 write.cfg.set_config(&COMPACTION_HEURISTIC_MIN_INPUTS, 1);
1281 let compactor = write.compact.as_ref().expect("compaction hard disabled");
1282
1283 write.cfg.disable_compaction();
1284 let result = compactor
1285 .compact_and_apply_background(req.clone(), &write.machine)
1286 .expect("listener")
1287 .await
1288 .expect("channel closed");
1289 assert_err!(result);
1290 assert_contains!(result.unwrap_err().to_string(), "compaction disabled");
1291
1292 write.cfg.enable_compaction();
1293 compactor
1294 .compact_and_apply_background(req, &write.machine)
1295 .expect("listener")
1296 .await
1297 .expect("channel closed")
1298 .expect("compaction success");
1299
1300 let data2 = [
1302 (("2".to_owned(), "two".to_owned()), 2, 1),
1303 (("2".to_owned(), "two".to_owned()), 3, -1),
1304 (("3".to_owned(), "three".to_owned()), 3, 1),
1305 ];
1306
1307 let b2 = write
1308 .expect_batch(&data2[..1], 2, 3)
1309 .await
1310 .into_hollow_batch();
1311 let b3 = write
1312 .expect_batch(&data2[1..], 3, 4)
1313 .await
1314 .into_hollow_batch();
1315
1316 let req = CompactReq {
1317 shard_id: write.machine.shard_id(),
1318 desc: Description::new(
1319 b2.desc.lower().clone(),
1320 b3.desc.upper().clone(),
1321 Antichain::from_elem(20u64),
1322 ),
1323 inputs: vec![
1324 IdHollowBatch {
1325 batch: Arc::new(b2),
1326 id: SpineId(0, 1),
1327 },
1328 IdHollowBatch {
1329 batch: Arc::new(b3),
1330 id: SpineId(1, 2),
1331 },
1332 ],
1333 };
1334 let compactor = write.compact.as_ref().expect("compaction hard disabled");
1335
1336 write.cfg.set_config(&COMPACTION_CHECK_PROCESS_FLAG, false);
1338 write.cfg.disable_compaction();
1339 compactor
1341 .compact_and_apply_background(req, &write.machine)
1342 .expect("listener")
1343 .await
1344 .expect("channel closed")
1345 .expect("compaction success");
1346 }
1347}