1use std::collections::BTreeSet;
11use std::fmt::Debug;
12use std::marker::PhantomData;
13use std::mem;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use differential_dataflow::difference::Monoid;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::Description;
21use futures::{Stream, pin_mut};
22use futures_util::StreamExt;
23use itertools::Either;
24use mz_dyncfg::Config;
25use mz_ore::cast::CastFrom;
26use mz_ore::error::ErrorExt;
27use mz_ore::now::NowFn;
28use mz_ore::soft_assert_or_log;
29use mz_persist::location::Blob;
30use mz_persist_types::part::Part;
31use mz_persist_types::{Codec, Codec64};
32use timely::PartialOrder;
33use timely::progress::{Antichain, Timestamp};
34use tokio::sync::mpsc::Sender;
35use tokio::sync::{TryAcquireError, mpsc, oneshot};
36use tracing::{Instrument, Span, debug, debug_span, error, trace, warn};
37
38use crate::async_runtime::IsolatedRuntime;
39use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, BatchParts, PartDeletes};
40use crate::cfg::{
41 COMPACTION_HEURISTIC_MIN_INPUTS, COMPACTION_HEURISTIC_MIN_PARTS,
42 COMPACTION_HEURISTIC_MIN_UPDATES, COMPACTION_MEMORY_BOUND_BYTES,
43 GC_BLOB_DELETE_CONCURRENCY_LIMIT, MiB,
44};
45use crate::fetch::{FetchBatchFilter, FetchConfig};
46use crate::internal::encoding::Schemas;
47use crate::internal::gc::GarbageCollector;
48use crate::internal::machine::Machine;
49use crate::internal::maintenance::RoutineMaintenance;
50use crate::internal::metrics::ShardMetrics;
51use crate::internal::state::{HollowBatch, RunMeta, RunOrder, RunPart};
52use crate::internal::trace::{
53 ActiveCompaction, ApplyMergeResult, CompactionInput, FueledMergeRes, IdHollowBatch, SpineId,
54 id_range,
55};
56use crate::iter::{Consolidator, StructuredSort};
57use crate::{Metrics, PersistConfig, ShardId};
58
59#[derive(Debug, Clone)]
65pub struct CompactReq<T> {
66 pub shard_id: ShardId,
68 pub desc: Description<T>,
70 pub inputs: Vec<IdHollowBatch<T>>,
73}
74
75#[derive(Debug)]
77pub struct CompactRes<T> {
78 pub output: HollowBatch<T>,
80 pub input: CompactionInput,
82}
83
84#[derive(Debug, Clone)]
87pub struct CompactConfig {
88 pub(crate) compaction_memory_bound_bytes: usize,
89 pub(crate) compaction_yield_after_n_updates: usize,
90 pub(crate) version: semver::Version,
91 pub(crate) batch: BatchBuilderConfig,
92 pub(crate) fetch_config: FetchConfig,
93 pub(crate) now: NowFn,
94}
95
96impl CompactConfig {
97 pub fn new(value: &PersistConfig, shard_id: ShardId) -> Self {
99 CompactConfig {
100 compaction_memory_bound_bytes: COMPACTION_MEMORY_BOUND_BYTES.get(value),
101 compaction_yield_after_n_updates: value.compaction_yield_after_n_updates,
102 version: value.build_version.clone(),
103 batch: BatchBuilderConfig::new(value, shard_id),
104 fetch_config: FetchConfig::from_persist_config(value),
105 now: value.now.clone(),
106 }
107 }
108}
109
110#[derive(Debug)]
116pub struct Compactor<K, V, T, D> {
117 cfg: PersistConfig,
118 metrics: Arc<Metrics>,
119 sender: Sender<(
120 Instant,
121 CompactReq<T>,
122 Machine<K, V, T, D>,
123 oneshot::Sender<Result<(), anyhow::Error>>,
124 )>,
125 _phantom: PhantomData<fn() -> D>,
126}
127
128impl<K, V, T, D> Clone for Compactor<K, V, T, D> {
129 fn clone(&self) -> Self {
130 Compactor {
131 cfg: self.cfg.clone(),
132 metrics: Arc::clone(&self.metrics),
133 sender: self.sender.clone(),
134 _phantom: Default::default(),
135 }
136 }
137}
138
139pub(crate) const COMPACTION_MINIMUM_TIMEOUT: Config<Duration> = Config::new(
143 "persist_compaction_minimum_timeout",
144 Duration::from_secs(90),
145 "\
146 The minimum amount of time to allow a persist compaction request to run \
147 before timing it out (Materialize).",
148);
149
150pub(crate) const COMPACTION_CHECK_PROCESS_FLAG: Config<bool> = Config::new(
151 "persist_compaction_check_process_flag",
152 true,
153 "Whether Compactor will obey the process_requests flag in PersistConfig, \
154 which allows dynamically disabling compaction. If false, all compaction requests will be processed.",
155);
156
157fn input_id_range(ids: BTreeSet<SpineId>) -> CompactionInput {
159 let id = id_range(ids);
160
161 CompactionInput::IdRange(id)
162}
163
164impl<K, V, T, D> Compactor<K, V, T, D>
165where
166 K: Debug + Codec,
167 V: Debug + Codec,
168 T: Timestamp + Lattice + Codec64 + Sync,
169 D: Monoid + Ord + Codec64 + Send + Sync,
170{
171 pub fn new(
172 cfg: PersistConfig,
173 metrics: Arc<Metrics>,
174 gc: GarbageCollector<K, V, T, D>,
175 ) -> Self {
176 let (compact_req_sender, mut compact_req_receiver) = mpsc::channel::<(
177 Instant,
178 CompactReq<T>,
179 Machine<K, V, T, D>,
180 oneshot::Sender<Result<(), anyhow::Error>>,
181 )>(cfg.compaction_queue_size);
182 let concurrency_limit = Arc::new(tokio::sync::Semaphore::new(
183 cfg.compaction_concurrency_limit,
184 ));
185 let check_process_requests = COMPACTION_CHECK_PROCESS_FLAG.handle(&cfg.configs);
186 let process_requests = Arc::clone(&cfg.compaction_process_requests);
187
188 let _worker_handle = mz_ore::task::spawn(|| "PersistCompactionScheduler", async move {
191 while let Some((enqueued, req, machine, completer)) = compact_req_receiver.recv().await
192 {
193 assert_eq!(req.shard_id, machine.shard_id());
194 let metrics = Arc::clone(&machine.applier.metrics);
195
196 if check_process_requests.get()
198 && !process_requests.load(std::sync::atomic::Ordering::Relaxed)
199 {
200 let _ = completer.send(Err(anyhow::anyhow!("compaction disabled")));
203 metrics.compaction.disabled.inc();
204 tracing::warn!(shard_id = ?req.shard_id, "Dropping compaction request on the floor.");
205
206 continue;
207 }
208
209 let permit = {
210 let inner = Arc::clone(&concurrency_limit);
211 match inner.try_acquire_owned() {
214 Ok(permit) => permit,
215 Err(TryAcquireError::NoPermits) => {
216 metrics.compaction.concurrency_waits.inc();
217 Arc::clone(&concurrency_limit)
218 .acquire_owned()
219 .await
220 .expect("semaphore is never closed")
221 }
222 Err(TryAcquireError::Closed) => {
223 warn!("semaphore for shard {} is closed", machine.shard_id());
226 continue;
227 }
228 }
229 };
230 metrics
231 .compaction
232 .queued_seconds
233 .inc_by(enqueued.elapsed().as_secs_f64());
234
235 let compact_span =
236 debug_span!(parent: None, "compact::apply", shard_id=%machine.shard_id());
237 compact_span.follows_from(&Span::current());
238 let gc = gc.clone();
239 mz_ore::task::spawn(|| "PersistCompactionWorker", async move {
240 let res = Self::compact_and_apply(&machine, req)
241 .instrument(compact_span)
242 .await;
243
244 match res {
245 Ok(maintenance) => maintenance.start_performing(&machine, &gc),
246 Err(err) => {
247 debug!(shard_id =? machine.shard_id(), "compaction failed: {err:#}")
248 }
249 }
250
251 let _ = completer.send(Ok(()));
254
255 drop(permit);
257 });
258 }
259 });
260
261 Compactor {
262 cfg,
263 metrics,
264 sender: compact_req_sender,
265 _phantom: PhantomData,
266 }
267 }
268
269 pub fn compact_and_apply_background(
274 &self,
275 req: CompactReq<T>,
276 machine: &Machine<K, V, T, D>,
277 ) -> Option<oneshot::Receiver<Result<(), anyhow::Error>>> {
278 let should_compact = req.inputs.len() >= COMPACTION_HEURISTIC_MIN_INPUTS.get(&self.cfg)
284 || req
285 .inputs
286 .iter()
287 .map(|x| x.batch.part_count())
288 .sum::<usize>()
289 >= COMPACTION_HEURISTIC_MIN_PARTS.get(&self.cfg)
290 || req.inputs.iter().map(|x| x.batch.len).sum::<usize>()
291 >= COMPACTION_HEURISTIC_MIN_UPDATES.get(&self.cfg);
292 if !should_compact {
293 self.metrics.compaction.skipped.inc();
294 return None;
295 }
296
297 let (compaction_completed_sender, compaction_completed_receiver) = oneshot::channel();
298 let new_compaction_sender = self.sender.clone();
299
300 self.metrics.compaction.requested.inc();
301 let send = new_compaction_sender.try_send((
307 Instant::now(),
308 req,
309 machine.clone(),
310 compaction_completed_sender,
311 ));
312 if let Err(_) = send {
313 self.metrics.compaction.dropped.inc();
314 return None;
315 }
316
317 Some(compaction_completed_receiver)
318 }
319
320 pub(crate) async fn compact_and_apply(
321 machine: &Machine<K, V, T, D>,
322 req: CompactReq<T>,
323 ) -> Result<RoutineMaintenance, anyhow::Error> {
324 let metrics = Arc::clone(&machine.applier.metrics);
325 metrics.compaction.started.inc();
326 let start = Instant::now();
327
328 let total_input_bytes = req
331 .inputs
332 .iter()
333 .map(|batch| batch.batch.encoded_size_bytes())
334 .sum::<usize>();
335 let timeout = Duration::max(
336 COMPACTION_MINIMUM_TIMEOUT.get(&machine.applier.cfg),
338 Duration::from_secs(u64::cast_from(total_input_bytes / MiB)),
340 );
341 let Some(compaction_schema_id) = req
344 .inputs
345 .iter()
346 .flat_map(|batch| batch.batch.run_meta.iter())
347 .filter_map(|run_meta| run_meta.schema)
348 .max()
350 else {
351 metrics.compaction.schema_selection.no_schema.inc();
352 metrics.compaction.failed.inc();
353 return Err(anyhow!(
354 "compacting {shard_id} and spine ids {spine_ids}: could not determine schema id from inputs",
355 shard_id = req.shard_id,
356 spine_ids = mz_ore::str::separated(", ", req.inputs.iter().map(|i| i.id))
357 ));
358 };
359 let Some((key_schema, val_schema)) = machine.get_schema(compaction_schema_id) else {
360 metrics.compaction.schema_selection.no_schema.inc();
361 metrics.compaction.failed.inc();
362 return Err(anyhow!(
363 "compacting {shard_id} and spine ids {spine_ids}: schema id {compaction_schema_id} not present in machine state",
364 shard_id = req.shard_id,
365 spine_ids = mz_ore::str::separated(", ", req.inputs.iter().map(|i| i.id))
366 ));
367 };
368
369 metrics.compaction.schema_selection.recent_schema.inc();
370
371 let compaction_schema = Schemas {
372 id: Some(compaction_schema_id),
373 key: Arc::new(key_schema),
374 val: Arc::new(val_schema),
375 };
376
377 trace!(
378 "compaction request for {}MBs ({} bytes), with timeout of {}s, and schema {:?}.",
379 total_input_bytes / MiB,
380 total_input_bytes,
381 timeout.as_secs_f64(),
382 compaction_schema.id,
383 );
384
385 let isolated_runtime = Arc::clone(&machine.isolated_runtime);
386 let machine_clone = machine.clone();
387 let metrics_clone = Arc::clone(&machine.applier.metrics);
388 let compact_span = debug_span!("compact::consolidate");
389 let res = tokio::time::timeout(
390 timeout,
391 isolated_runtime.spawn_named(
393 || "persist::compact::consolidate",
394 async move {
395 let all_runs_have_uuids = req
399 .inputs
400 .iter()
401 .all(|x| x.batch.runs().all(|(meta, _)| meta.id.is_some()));
402 let all_runs_have_len = req
403 .inputs
404 .iter()
405 .all(|x| x.batch.runs().all(|(meta, _)| meta.len.is_some()));
406
407 let compact_cfg =
408 CompactConfig::new(&machine_clone.applier.cfg, machine_clone.shard_id());
409 let incremental_enabled = compact_cfg.batch.enable_incremental_compaction
410 && all_runs_have_uuids
411 && all_runs_have_len;
412 let stream = Self::compact_stream(
413 compact_cfg,
414 Arc::clone(&machine_clone.applier.state_versions.blob),
415 Arc::clone(&metrics_clone),
416 Arc::clone(&machine_clone.applier.shard_metrics),
417 Arc::clone(&machine_clone.isolated_runtime),
418 req.clone(),
419 compaction_schema,
420 incremental_enabled,
421 );
422
423 let maintenance = if incremental_enabled {
424 let mut maintenance = RoutineMaintenance::default();
425 pin_mut!(stream);
426 while let Some(res) = stream.next().await {
427 let res = res?;
428 let new_maintenance =
429 Self::apply(res, &metrics_clone, &machine_clone).await?;
430 maintenance.merge(new_maintenance);
431 }
432 maintenance
433 } else {
434 let res = Self::compact_all(stream, req.clone()).await?;
435 Self::apply(
436 FueledMergeRes {
437 output: res.output,
438 input: CompactionInput::Legacy,
439 new_active_compaction: None,
440 },
441 &metrics_clone,
442 &machine_clone,
443 )
444 .await?
445 };
446
447 Ok::<_, anyhow::Error>(maintenance)
448 }
449 .instrument(compact_span),
450 ),
451 )
452 .await;
453
454 metrics
455 .compaction
456 .seconds
457 .inc_by(start.elapsed().as_secs_f64());
458 let res = res
459 .map_err(|e| {
460 metrics.compaction.timed_out.inc();
461 anyhow!(
462 "compaction timed out after {}s: {}",
463 timeout.as_secs_f64(),
464 e
465 )
466 })?
467 .map_err(|e| anyhow!(e))?;
468
469 match res {
470 Ok(maintenance) => Ok(maintenance),
471 Err(err) => {
472 metrics.compaction.failed.inc();
473 debug!(
474 "compaction for {} failed: {}",
475 machine.shard_id(),
476 err.display_with_causes()
477 );
478 Err(err)
479 }
480 }
481 }
482
483 pub async fn compact_all(
484 stream: impl Stream<Item = Result<FueledMergeRes<T>, anyhow::Error>>,
485 req: CompactReq<T>,
486 ) -> Result<CompactRes<T>, anyhow::Error> {
487 pin_mut!(stream);
488
489 let mut all_parts = vec![];
490 let mut all_run_splits = vec![];
491 let mut all_run_meta = vec![];
492 let mut len = 0;
493
494 while let Some(res) = stream.next().await {
495 let res = res?.output;
496 let (parts, updates, run_meta, run_splits) =
497 (res.parts, res.len, res.run_meta, res.run_splits);
498
499 if updates == 0 {
500 continue;
501 }
502
503 let run_offset = all_parts.len();
504 if !all_parts.is_empty() {
505 all_run_splits.push(run_offset);
506 }
507 all_run_splits.extend(run_splits.iter().map(|r| r + run_offset));
508 all_run_meta.extend(run_meta);
509 all_parts.extend(parts);
510 len += updates;
511 }
512
513 let batches = req.inputs.iter().map(|x| x.id).collect::<BTreeSet<_>>();
514 let input = input_id_range(batches);
515
516 Ok(CompactRes {
517 output: HollowBatch::new(
518 req.desc.clone(),
519 all_parts,
520 len,
521 all_run_meta,
522 all_run_splits,
523 ),
524 input,
525 })
526 }
527
528 pub async fn apply(
529 res: FueledMergeRes<T>,
530 metrics: &Metrics,
531 machine: &Machine<K, V, T, D>,
532 ) -> Result<RoutineMaintenance, anyhow::Error> {
533 let (apply_merge_result, maintenance) = machine.merge_res(&res).await;
534
535 match &apply_merge_result {
536 ApplyMergeResult::AppliedExact => {
537 metrics.compaction.applied.inc();
538 metrics.compaction.applied_exact_match.inc();
539 machine.applier.shard_metrics.compaction_applied.inc();
540 }
541 ApplyMergeResult::AppliedSubset => {
542 metrics.compaction.applied.inc();
543 metrics.compaction.applied_subset_match.inc();
544 machine.applier.shard_metrics.compaction_applied.inc();
545 }
546 ApplyMergeResult::NotAppliedNoMatch
547 | ApplyMergeResult::NotAppliedInvalidSince
548 | ApplyMergeResult::NotAppliedTooManyUpdates => {
549 if let ApplyMergeResult::NotAppliedTooManyUpdates = &apply_merge_result {
550 metrics.compaction.not_applied_too_many_updates.inc();
551 }
552 metrics.compaction.noop.inc();
553 let mut part_deletes = PartDeletes::default();
554 for part in &res.output.parts {
555 part_deletes.add(part);
556 }
557 part_deletes
558 .delete(
559 machine.applier.state_versions.blob.as_ref(),
560 machine.shard_id(),
561 GC_BLOB_DELETE_CONCURRENCY_LIMIT.get(&machine.applier.cfg),
562 &*metrics,
563 &metrics.retries.external.compaction_noop_delete,
564 )
565 .await;
566 }
567 };
568
569 Ok(maintenance)
570 }
571
572 pub fn compact_stream(
596 cfg: CompactConfig,
597 blob: Arc<dyn Blob>,
598 metrics: Arc<Metrics>,
599 shard_metrics: Arc<ShardMetrics>,
600 isolated_runtime: Arc<IsolatedRuntime>,
601 req: CompactReq<T>,
602 write_schemas: Schemas<K, V>,
603 incremental_enabled: bool,
604 ) -> impl Stream<Item = Result<FueledMergeRes<T>, anyhow::Error>> {
605 async_stream::stream! {
606 let () = Self::validate_req(&req)?;
607
608 let mut single_nonempty_batch = None;
613 for batch in &req.inputs {
614 if batch.batch.len > 0 {
615 match single_nonempty_batch {
616 None => single_nonempty_batch = Some(batch),
617 Some(_previous_nonempty_batch) => {
618 single_nonempty_batch = None;
619 break;
620 }
621 }
622 }
623 }
624 if let Some(single_nonempty_batch) = single_nonempty_batch {
625 if single_nonempty_batch.batch.run_splits.len() == 0
626 && single_nonempty_batch.batch.desc.since() != &Antichain::from_elem(T::minimum())
627 {
628 metrics.compaction.fast_path_eligible.inc();
629 }
630 }
631
632 let in_progress_part_reserved_memory_bytes = 2 * cfg.batch.blob_target_size;
634 let run_reserved_memory_bytes = cfg
639 .compaction_memory_bound_bytes
640 .saturating_sub(in_progress_part_reserved_memory_bytes);
641
642 let chunked_runs = Self::chunk_runs(
643 &req,
644 &cfg,
645 &*metrics,
646 run_reserved_memory_bytes,
647 req.desc.since()
648 );
649 let total_chunked_runs = chunked_runs.len();
650
651 let parts_before = req.inputs.iter().map(|x| x.batch.parts.len()).sum::<usize>();
652 let parts_after = chunked_runs.iter().flat_map(|(_, _, runs, _)| runs.iter().map(|(_, _, parts)| parts.len())).sum::<usize>();
653 assert_eq!(parts_before, parts_after, "chunking should not change the number of parts");
654
655 for (applied, (input, desc, runs, run_chunk_max_memory_usage)) in
656 chunked_runs.into_iter().enumerate()
657 {
658 metrics.compaction.chunks_compacted.inc();
659 metrics
660 .compaction
661 .runs_compacted
662 .inc_by(u64::cast_from(runs.len()));
663
664 let extra_outstanding_parts = (run_reserved_memory_bytes
668 .saturating_sub(run_chunk_max_memory_usage))
669 / cfg.batch.blob_target_size;
670 let mut run_cfg = cfg.clone();
671 run_cfg.batch.batch_builder_max_outstanding_parts = 1 + extra_outstanding_parts;
672
673 let desc = if incremental_enabled {
674 desc
675 } else {
676 req.desc.clone()
677 };
678
679 let runs = runs.iter()
680 .map(|(desc, meta, run)| (*desc, *meta, *run))
681 .collect::<Vec<_>>();
682
683 let batch = Self::compact_runs(
684 &run_cfg,
685 &req.shard_id,
686 &desc,
687 runs,
688 Arc::clone(&blob),
689 Arc::clone(&metrics),
690 Arc::clone(&shard_metrics),
691 Arc::clone(&isolated_runtime),
692 write_schemas.clone(),
693 )
694 .await?;
695
696 assert!(
697 (batch.len == 0 && batch.parts.len() == 0) || (batch.len > 0 && batch.parts.len() > 0),
698 "updates={}, parts={}",
699 batch.len,
700 batch.parts.len(),
701 );
702
703 let clock = cfg.now.clone();
705 let active_compaction = if applied < total_chunked_runs - 1 {
706 Some(ActiveCompaction { start_ms: clock() })
707 } else {
708 None
709 };
710
711 let res = CompactRes {
712 output: batch,
713 input,
714 };
715
716 let res = FueledMergeRes {
717 output: res.output,
718 new_active_compaction: active_compaction,
719 input: res.input,
720 };
721
722 yield Ok(res);
723 }
724 }
725 }
726
727 pub async fn compact(
732 cfg: CompactConfig,
733 blob: Arc<dyn Blob>,
734 metrics: Arc<Metrics>,
735 shard_metrics: Arc<ShardMetrics>,
736 isolated_runtime: Arc<IsolatedRuntime>,
737 req: CompactReq<T>,
738 write_schemas: Schemas<K, V>,
739 ) -> Result<CompactRes<T>, anyhow::Error> {
740 let stream = Self::compact_stream(
741 cfg,
742 Arc::clone(&blob),
743 Arc::clone(&metrics),
744 Arc::clone(&shard_metrics),
745 Arc::clone(&isolated_runtime),
746 req.clone(),
747 write_schemas,
748 false,
749 );
750
751 Self::compact_all(stream, req).await
752 }
753
754 fn chunk_runs<'a>(
759 req: &'a CompactReq<T>,
760 cfg: &CompactConfig,
761 metrics: &Metrics,
762 run_reserved_memory_bytes: usize,
763 since: &Antichain<T>,
764 ) -> Vec<(
765 CompactionInput,
766 Description<T>,
767 Vec<(&'a Description<T>, &'a RunMeta, &'a [RunPart<T>])>,
768 usize,
769 )> {
770 let _ = input_id_range(req.inputs.iter().map(|x| x.id).collect());
772
773 let mut batches: Vec<_> = req.inputs.iter().map(|x| (x.id, &*x.batch)).collect();
775 batches.sort_by_key(|(id, _)| *id);
776
777 let mut chunks = vec![];
778 let mut current_chunk_ids = BTreeSet::new();
779 let mut current_chunk_descs = Vec::new();
780 let mut current_chunk_runs = vec![];
781 let mut current_chunk_max_memory_usage = 0;
782
783 fn max_part_bytes<T>(parts: &[RunPart<T>], cfg: &CompactConfig) -> usize {
784 parts
785 .iter()
786 .map(|p| p.max_part_bytes())
787 .max()
788 .unwrap_or(cfg.batch.blob_target_size)
789 }
790
791 fn desc_range<T: Timestamp>(
792 descs: impl IntoIterator<Item = Description<T>>,
793 since: Antichain<T>,
794 ) -> Description<T> {
795 let mut descs = descs.into_iter();
796 let first = descs.next().expect("non-empty set of descriptions");
797 let lower = first.lower().clone();
798 let mut upper = first.upper().clone();
799 for desc in descs {
800 assert_eq!(&upper, desc.lower());
801 upper = desc.upper().clone();
802 }
803 let upper = upper.clone();
804 Description::new(lower, upper, since)
805 }
806
807 for (spine_id, batch) in batches {
808 let batch_size = batch
809 .runs()
810 .map(|(_, parts)| max_part_bytes(parts, cfg))
811 .sum::<usize>();
812
813 let num_runs = batch.run_meta.len();
814
815 let runs = batch.runs().flat_map(|(meta, parts)| {
816 if meta.order.unwrap_or(RunOrder::Codec) == cfg.batch.preferred_order {
817 Either::Left(std::iter::once((&batch.desc, meta, parts)))
818 } else {
819 soft_assert_or_log!(
829 !parts.iter().any(|r| matches!(r, RunPart::Many(_))),
830 "unexpected out-of-order hollow run"
831 );
832 Either::Right(
833 parts
834 .iter()
835 .map(move |p| (&batch.desc, meta, std::slice::from_ref(p))),
836 )
837 }
838 });
839
840 if current_chunk_max_memory_usage + batch_size <= run_reserved_memory_bytes
844 || current_chunk_runs.len() + num_runs <= 2
845 {
846 if current_chunk_max_memory_usage + batch_size > run_reserved_memory_bytes {
847 metrics.compaction.memory_violations.inc();
850 }
851 current_chunk_ids.insert(spine_id);
852 current_chunk_descs.push(batch.desc.clone());
853 current_chunk_runs.extend(runs);
854 current_chunk_max_memory_usage += batch_size;
855 continue;
856 }
857
858 if !current_chunk_ids.is_empty() {
860 chunks.push((
861 input_id_range(std::mem::take(&mut current_chunk_ids)),
862 desc_range(mem::take(&mut current_chunk_descs), since.clone()),
863 std::mem::take(&mut current_chunk_runs),
864 current_chunk_max_memory_usage,
865 ));
866 current_chunk_max_memory_usage = 0;
867 }
868
869 if batch_size <= run_reserved_memory_bytes {
871 current_chunk_ids.insert(spine_id);
872 current_chunk_descs.push(batch.desc.clone());
873 current_chunk_runs.extend(runs);
874 current_chunk_max_memory_usage += batch_size;
875 continue;
876 }
877
878 let mut run_iter = runs.into_iter().peekable();
881 debug_assert!(current_chunk_ids.is_empty());
882 debug_assert!(current_chunk_descs.is_empty());
883 debug_assert!(current_chunk_runs.is_empty());
884 debug_assert_eq!(current_chunk_max_memory_usage, 0);
885 let mut current_chunk_run_ids = BTreeSet::new();
886
887 while let Some((desc, meta, parts)) = run_iter.next() {
888 let run_size = max_part_bytes(parts, cfg);
889 current_chunk_runs.push((desc, meta, parts));
890 current_chunk_max_memory_usage += run_size;
891 current_chunk_run_ids.extend(meta.id);
892
893 if let Some((_, _meta, next_parts)) = run_iter.peek() {
894 let next_size = max_part_bytes(next_parts, cfg);
895 if current_chunk_max_memory_usage + next_size > run_reserved_memory_bytes {
896 if current_chunk_runs.len() == 1 {
898 metrics.compaction.memory_violations.inc();
899 continue;
900 }
901 chunks.push((
903 CompactionInput::PartialBatch(
904 spine_id,
905 mem::take(&mut current_chunk_run_ids),
906 ),
907 desc_range([batch.desc.clone()], since.clone()),
908 std::mem::take(&mut current_chunk_runs),
909 current_chunk_max_memory_usage,
910 ));
911 current_chunk_max_memory_usage = 0;
912 }
913 }
914 }
915
916 if !current_chunk_runs.is_empty() {
917 chunks.push((
918 CompactionInput::PartialBatch(spine_id, mem::take(&mut current_chunk_run_ids)),
919 desc_range([batch.desc.clone()], since.clone()),
920 std::mem::take(&mut current_chunk_runs),
921 current_chunk_max_memory_usage,
922 ));
923 current_chunk_max_memory_usage = 0;
924 }
925 }
926
927 if !current_chunk_ids.is_empty() {
929 chunks.push((
930 input_id_range(current_chunk_ids),
931 desc_range(current_chunk_descs, since.clone()),
932 current_chunk_runs,
933 current_chunk_max_memory_usage,
934 ));
935 }
936
937 chunks
938 }
939
940 pub(crate) async fn compact_runs(
944 cfg: &CompactConfig,
945 shard_id: &ShardId,
946 desc: &Description<T>,
947 runs: Vec<(&Description<T>, &RunMeta, &[RunPart<T>])>,
948 blob: Arc<dyn Blob>,
949 metrics: Arc<Metrics>,
950 shard_metrics: Arc<ShardMetrics>,
951 isolated_runtime: Arc<IsolatedRuntime>,
952 write_schemas: Schemas<K, V>,
953 ) -> Result<HollowBatch<T>, anyhow::Error> {
954 let prefetch_budget_bytes = 2 * cfg.batch.blob_target_size;
962
963 let mut timings = Timings::default();
964
965 let mut batch_cfg = cfg.batch.clone();
966
967 batch_cfg.inline_writes_single_max_bytes = 0;
972
973 let parts = BatchParts::new_ordered::<D>(
974 batch_cfg,
975 cfg.batch.preferred_order,
976 Arc::clone(&metrics),
977 Arc::clone(&shard_metrics),
978 *shard_id,
979 Arc::clone(&blob),
980 Arc::clone(&isolated_runtime),
981 &metrics.compaction.batch,
982 );
983 let mut batch = BatchBuilderInternal::<K, V, T, D>::new(
984 cfg.batch.clone(),
985 parts,
986 Arc::clone(&metrics),
987 write_schemas.clone(),
988 Arc::clone(&blob),
989 shard_id.clone(),
990 cfg.version.clone(),
991 );
992
993 let mut consolidator = Consolidator::new(
994 format!(
995 "{}[lower={:?},upper={:?}]",
996 shard_id,
997 desc.lower().elements(),
998 desc.upper().elements()
999 ),
1000 cfg.fetch_config.clone(),
1001 *shard_id,
1002 StructuredSort::<K, V, T, D>::new(write_schemas.clone()),
1003 blob,
1004 Arc::clone(&metrics),
1005 shard_metrics,
1006 metrics.read.compaction.clone(),
1007 FetchBatchFilter::Compaction {
1008 since: desc.since().clone(),
1009 },
1010 None,
1011 prefetch_budget_bytes,
1012 );
1013
1014 for (desc, meta, parts) in runs {
1015 consolidator.enqueue_run(desc, meta, parts.iter().cloned());
1016 }
1017
1018 let remaining_budget = consolidator.start_prefetches();
1019 if remaining_budget.is_none() {
1020 metrics.compaction.not_all_prefetched.inc();
1021 }
1022
1023 loop {
1024 let mut chunks = vec![];
1025 let mut total_bytes = 0;
1026 while total_bytes < cfg.batch.blob_target_size {
1031 let fetch_start = Instant::now();
1032 let Some(chunk) = consolidator
1033 .next_chunk(
1034 cfg.compaction_yield_after_n_updates,
1035 cfg.batch.blob_target_size - total_bytes,
1036 )
1037 .await?
1038 else {
1039 break;
1040 };
1041 timings.part_fetching += fetch_start.elapsed();
1042 total_bytes += chunk.goodbytes();
1043 chunks.push(chunk);
1044 tokio::task::yield_now().await;
1045 }
1046
1047 let Some(updates) = Part::concat(&chunks).expect("compaction produces well-typed data")
1049 else {
1050 break;
1051 };
1052 batch.flush_part(desc.clone(), updates).await;
1053 }
1054 let mut batch = batch.finish(desc.clone()).await?;
1055
1056 let has_inline_parts = batch.batch.parts.iter().any(|x| x.is_inline());
1062 if has_inline_parts {
1063 error!(%shard_id, ?cfg, "compaction result unexpectedly had inline writes");
1064 let () = batch
1065 .flush_to_blob(
1066 &cfg.batch,
1067 &metrics.compaction.batch,
1068 &isolated_runtime,
1069 &write_schemas,
1070 )
1071 .await;
1072 }
1073
1074 timings.record(&metrics);
1075 Ok(batch.into_hollow_batch())
1076 }
1077
1078 fn validate_req(req: &CompactReq<T>) -> Result<(), anyhow::Error> {
1079 let mut frontier = req.desc.lower();
1080 for input in req.inputs.iter() {
1081 if PartialOrder::less_than(req.desc.since(), input.batch.desc.since()) {
1082 return Err(anyhow!(
1083 "output since {:?} must be at or in advance of input since {:?}",
1084 req.desc.since(),
1085 input.batch.desc.since()
1086 ));
1087 }
1088 if frontier != input.batch.desc.lower() {
1089 return Err(anyhow!(
1090 "invalid merge of non-consecutive batches {:?} vs {:?}",
1091 frontier,
1092 input.batch.desc.lower()
1093 ));
1094 }
1095 frontier = input.batch.desc.upper();
1096 }
1097 if frontier != req.desc.upper() {
1098 return Err(anyhow!(
1099 "invalid merge of non-consecutive batches {:?} vs {:?}",
1100 frontier,
1101 req.desc.upper()
1102 ));
1103 }
1104 Ok(())
1105 }
1106}
1107
1108#[derive(Debug, Default)]
1109struct Timings {
1110 part_fetching: Duration,
1111 heap_population: Duration,
1112}
1113
1114impl Timings {
1115 fn record(self, metrics: &Metrics) {
1116 let Timings {
1118 part_fetching,
1119 heap_population,
1120 } = self;
1121
1122 metrics
1123 .compaction
1124 .steps
1125 .part_fetch_seconds
1126 .inc_by(part_fetching.as_secs_f64());
1127 metrics
1128 .compaction
1129 .steps
1130 .heap_population_seconds
1131 .inc_by(heap_population.as_secs_f64());
1132 }
1133}
1134
1135#[cfg(test)]
1136mod tests {
1137 use mz_dyncfg::ConfigUpdates;
1138 use mz_ore::{assert_contains, assert_err};
1139 use mz_persist_types::codec_impls::StringSchema;
1140 use timely::progress::Antichain;
1141
1142 use crate::PersistLocation;
1143 use crate::batch::BLOB_TARGET_SIZE;
1144 use crate::internal::trace::SpineId;
1145 use crate::tests::{all_ok, expect_fetch_part, new_test_client_cache};
1146
1147 use super::*;
1148
1149 #[mz_persist_proc::test(tokio::test)]
1153 #[cfg_attr(miri, ignore)] async fn regression_minimum_since(dyncfgs: ConfigUpdates) {
1155 let data = vec![
1156 (("0".to_owned(), "zero".to_owned()), 0, 1),
1157 (("0".to_owned(), "zero".to_owned()), 1, -1),
1158 (("1".to_owned(), "one".to_owned()), 1, 1),
1159 ];
1160
1161 let cache = new_test_client_cache(&dyncfgs);
1162 cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1163 let (mut write, _) = cache
1164 .open(PersistLocation::new_in_mem())
1165 .await
1166 .expect("client construction failed")
1167 .expect_open::<String, String, u64, i64>(ShardId::new())
1168 .await;
1169 let b0 = write
1170 .expect_batch(&data[..1], 0, 1)
1171 .await
1172 .into_hollow_batch();
1173 let b1 = write
1174 .expect_batch(&data[1..], 1, 2)
1175 .await
1176 .into_hollow_batch();
1177
1178 let req = CompactReq {
1179 shard_id: write.machine.shard_id(),
1180 desc: Description::new(
1181 b0.desc.lower().clone(),
1182 b1.desc.upper().clone(),
1183 Antichain::from_elem(10u64),
1184 ),
1185 inputs: vec![
1186 IdHollowBatch {
1187 batch: Arc::new(b0),
1188 id: SpineId(0, 1),
1189 },
1190 IdHollowBatch {
1191 batch: Arc::new(b1),
1192 id: SpineId(1, 2),
1193 },
1194 ],
1195 };
1196 let schemas = Schemas {
1197 id: None,
1198 key: Arc::new(StringSchema),
1199 val: Arc::new(StringSchema),
1200 };
1201 let res = Compactor::<String, String, u64, i64>::compact(
1202 CompactConfig::new(&write.cfg, write.shard_id()),
1203 Arc::clone(&write.blob),
1204 Arc::clone(&write.metrics),
1205 write.metrics.shards.shard(&write.machine.shard_id(), ""),
1206 Arc::new(IsolatedRuntime::new_for_tests()),
1207 req.clone(),
1208 schemas.clone(),
1209 )
1210 .await
1211 .expect("compaction failed");
1212
1213 assert_eq!(res.output.desc, req.desc);
1214 assert_eq!(res.output.len, 1);
1215 assert_eq!(res.output.part_count(), 1);
1216 let part = res.output.parts[0].expect_hollow_part();
1217 let (part, updates) = expect_fetch_part(
1218 write.blob.as_ref(),
1219 &part.key.complete(&write.machine.shard_id()),
1220 &write.metrics,
1221 &schemas,
1222 )
1223 .await;
1224 assert_eq!(part.desc, res.output.desc);
1225 assert_eq!(updates, all_ok(&data, 10));
1226 }
1227
1228 #[mz_persist_proc::test(tokio::test)]
1229 #[cfg_attr(miri, ignore)] async fn disable_compaction(dyncfgs: ConfigUpdates) {
1231 let data = [
1232 (("0".to_owned(), "zero".to_owned()), 0, 1),
1233 (("0".to_owned(), "zero".to_owned()), 1, -1),
1234 (("1".to_owned(), "one".to_owned()), 1, 1),
1235 ];
1236
1237 let cache = new_test_client_cache(&dyncfgs);
1238 cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1239 let (mut write, _) = cache
1240 .open(PersistLocation::new_in_mem())
1241 .await
1242 .expect("client construction failed")
1243 .expect_open::<String, String, u64, i64>(ShardId::new())
1244 .await;
1245 let b0 = write
1246 .expect_batch(&data[..1], 0, 1)
1247 .await
1248 .into_hollow_batch();
1249 let b1 = write
1250 .expect_batch(&data[1..], 1, 2)
1251 .await
1252 .into_hollow_batch();
1253
1254 let req = CompactReq {
1255 shard_id: write.machine.shard_id(),
1256 desc: Description::new(
1257 b0.desc.lower().clone(),
1258 b1.desc.upper().clone(),
1259 Antichain::from_elem(10u64),
1260 ),
1261 inputs: vec![
1262 IdHollowBatch {
1263 batch: Arc::new(b0),
1264 id: SpineId(0, 1),
1265 },
1266 IdHollowBatch {
1267 batch: Arc::new(b1),
1268 id: SpineId(1, 2),
1269 },
1270 ],
1271 };
1272 write.cfg.set_config(&COMPACTION_HEURISTIC_MIN_INPUTS, 1);
1273 let compactor = write.compact.as_ref().expect("compaction hard disabled");
1274
1275 write.cfg.disable_compaction();
1276 let result = compactor
1277 .compact_and_apply_background(req.clone(), &write.machine)
1278 .expect("listener")
1279 .await
1280 .expect("channel closed");
1281 assert_err!(result);
1282 assert_contains!(result.unwrap_err().to_string(), "compaction disabled");
1283
1284 write.cfg.enable_compaction();
1285 compactor
1286 .compact_and_apply_background(req, &write.machine)
1287 .expect("listener")
1288 .await
1289 .expect("channel closed")
1290 .expect("compaction success");
1291
1292 let data2 = [
1294 (("2".to_owned(), "two".to_owned()), 2, 1),
1295 (("2".to_owned(), "two".to_owned()), 3, -1),
1296 (("3".to_owned(), "three".to_owned()), 3, 1),
1297 ];
1298
1299 let b2 = write
1300 .expect_batch(&data2[..1], 2, 3)
1301 .await
1302 .into_hollow_batch();
1303 let b3 = write
1304 .expect_batch(&data2[1..], 3, 4)
1305 .await
1306 .into_hollow_batch();
1307
1308 let req = CompactReq {
1309 shard_id: write.machine.shard_id(),
1310 desc: Description::new(
1311 b2.desc.lower().clone(),
1312 b3.desc.upper().clone(),
1313 Antichain::from_elem(20u64),
1314 ),
1315 inputs: vec![
1316 IdHollowBatch {
1317 batch: Arc::new(b2),
1318 id: SpineId(0, 1),
1319 },
1320 IdHollowBatch {
1321 batch: Arc::new(b3),
1322 id: SpineId(1, 2),
1323 },
1324 ],
1325 };
1326 let compactor = write.compact.as_ref().expect("compaction hard disabled");
1327
1328 write.cfg.set_config(&COMPACTION_CHECK_PROCESS_FLAG, false);
1330 write.cfg.disable_compaction();
1331 compactor
1333 .compact_and_apply_background(req, &write.machine)
1334 .expect("listener")
1335 .await
1336 .expect("channel closed")
1337 .expect("compaction success");
1338 }
1339}