1use std::collections::BTreeSet;
11use std::fmt::Debug;
12use std::marker::PhantomData;
13use std::mem;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use differential_dataflow::difference::Semigroup;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::Description;
21use futures::{Stream, pin_mut};
22use futures_util::StreamExt;
23use itertools::Either;
24use mz_dyncfg::Config;
25use mz_ore::cast::CastFrom;
26use mz_ore::error::ErrorExt;
27use mz_ore::now::NowFn;
28use mz_ore::soft_assert_or_log;
29use mz_persist::location::Blob;
30use mz_persist_types::part::Part;
31use mz_persist_types::{Codec, Codec64};
32use timely::PartialOrder;
33use timely::progress::{Antichain, Timestamp};
34use tokio::sync::mpsc::Sender;
35use tokio::sync::{TryAcquireError, mpsc, oneshot};
36use tracing::{Instrument, Span, debug, debug_span, error, trace, warn};
37
38use crate::async_runtime::IsolatedRuntime;
39use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, BatchParts, PartDeletes};
40use crate::cfg::{
41 COMPACTION_HEURISTIC_MIN_INPUTS, COMPACTION_HEURISTIC_MIN_PARTS,
42 COMPACTION_HEURISTIC_MIN_UPDATES, COMPACTION_MEMORY_BOUND_BYTES,
43 GC_BLOB_DELETE_CONCURRENCY_LIMIT, MiB,
44};
45use crate::fetch::{FetchBatchFilter, FetchConfig};
46use crate::internal::encoding::Schemas;
47use crate::internal::gc::GarbageCollector;
48use crate::internal::machine::Machine;
49use crate::internal::maintenance::RoutineMaintenance;
50use crate::internal::metrics::ShardMetrics;
51use crate::internal::state::{HollowBatch, RunMeta, RunOrder, RunPart};
52use crate::internal::trace::{
53 ActiveCompaction, ApplyMergeResult, CompactionInput, FueledMergeRes, IdHollowBatch, SpineId,
54 id_range,
55};
56use crate::iter::{Consolidator, StructuredSort};
57use crate::{Metrics, PersistConfig, ShardId};
58
59#[derive(Debug, Clone)]
65pub struct CompactReq<T> {
66 pub shard_id: ShardId,
68 pub desc: Description<T>,
70 pub inputs: Vec<IdHollowBatch<T>>,
73}
74
75#[derive(Debug)]
77pub struct CompactRes<T> {
78 pub output: HollowBatch<T>,
80 pub input: CompactionInput,
82}
83
84#[derive(Debug, Clone)]
87pub struct CompactConfig {
88 pub(crate) compaction_memory_bound_bytes: usize,
89 pub(crate) compaction_yield_after_n_updates: usize,
90 pub(crate) version: semver::Version,
91 pub(crate) batch: BatchBuilderConfig,
92 pub(crate) fetch_config: FetchConfig,
93 pub(crate) now: NowFn,
94}
95
96impl CompactConfig {
97 pub fn new(value: &PersistConfig, shard_id: ShardId) -> Self {
99 CompactConfig {
100 compaction_memory_bound_bytes: COMPACTION_MEMORY_BOUND_BYTES.get(value),
101 compaction_yield_after_n_updates: value.compaction_yield_after_n_updates,
102 version: value.build_version.clone(),
103 batch: BatchBuilderConfig::new(value, shard_id),
104 fetch_config: FetchConfig::from_persist_config(value),
105 now: value.now.clone(),
106 }
107 }
108}
109
110#[derive(Debug)]
116pub struct Compactor<K, V, T, D> {
117 cfg: PersistConfig,
118 metrics: Arc<Metrics>,
119 sender: Sender<(
120 Instant,
121 CompactReq<T>,
122 Machine<K, V, T, D>,
123 oneshot::Sender<Result<(), anyhow::Error>>,
124 )>,
125 _phantom: PhantomData<fn() -> D>,
126}
127
128impl<K, V, T, D> Clone for Compactor<K, V, T, D> {
129 fn clone(&self) -> Self {
130 Compactor {
131 cfg: self.cfg.clone(),
132 metrics: Arc::clone(&self.metrics),
133 sender: self.sender.clone(),
134 _phantom: Default::default(),
135 }
136 }
137}
138
139pub(crate) const COMPACTION_MINIMUM_TIMEOUT: Config<Duration> = Config::new(
143 "persist_compaction_minimum_timeout",
144 Duration::from_secs(90),
145 "\
146 The minimum amount of time to allow a persist compaction request to run \
147 before timing it out (Materialize).",
148);
149
150pub(crate) const COMPACTION_USE_MOST_RECENT_SCHEMA: Config<bool> = Config::new(
151 "persist_compaction_use_most_recent_schema",
152 true,
153 "\
154 Use the most recent schema from all the Runs that are currently being \
155 compacted, instead of the schema on the current write handle (Materialize).
156 ",
157);
158
159pub(crate) const COMPACTION_CHECK_PROCESS_FLAG: Config<bool> = Config::new(
160 "persist_compaction_check_process_flag",
161 true,
162 "Whether Compactor will obey the process_requests flag in PersistConfig, \
163 which allows dynamically disabling compaction. If false, all compaction requests will be processed.",
164);
165
166fn input_id_range(ids: BTreeSet<SpineId>) -> CompactionInput {
168 let id = id_range(ids);
169
170 CompactionInput::IdRange(id)
171}
172
173impl<K, V, T, D> Compactor<K, V, T, D>
174where
175 K: Debug + Codec,
176 V: Debug + Codec,
177 T: Timestamp + Lattice + Codec64 + Sync,
178 D: Semigroup + Ord + Codec64 + Send + Sync,
179{
180 pub fn new(
181 cfg: PersistConfig,
182 metrics: Arc<Metrics>,
183 write_schemas: Schemas<K, V>,
184 gc: GarbageCollector<K, V, T, D>,
185 ) -> Self {
186 let (compact_req_sender, mut compact_req_receiver) = mpsc::channel::<(
187 Instant,
188 CompactReq<T>,
189 Machine<K, V, T, D>,
190 oneshot::Sender<Result<(), anyhow::Error>>,
191 )>(cfg.compaction_queue_size);
192 let concurrency_limit = Arc::new(tokio::sync::Semaphore::new(
193 cfg.compaction_concurrency_limit,
194 ));
195 let check_process_requests = COMPACTION_CHECK_PROCESS_FLAG.handle(&cfg.configs);
196 let process_requests = Arc::clone(&cfg.compaction_process_requests);
197
198 let _worker_handle = mz_ore::task::spawn(|| "PersistCompactionScheduler", async move {
201 while let Some((enqueued, req, machine, completer)) = compact_req_receiver.recv().await
202 {
203 assert_eq!(req.shard_id, machine.shard_id());
204 let metrics = Arc::clone(&machine.applier.metrics);
205
206 if check_process_requests.get()
208 && !process_requests.load(std::sync::atomic::Ordering::Relaxed)
209 {
210 let _ = completer.send(Err(anyhow::anyhow!("compaction disabled")));
213 metrics.compaction.disabled.inc();
214 tracing::warn!(shard_id = ?req.shard_id, "Dropping compaction request on the floor.");
215
216 continue;
217 }
218
219 let permit = {
220 let inner = Arc::clone(&concurrency_limit);
221 match inner.try_acquire_owned() {
224 Ok(permit) => permit,
225 Err(TryAcquireError::NoPermits) => {
226 metrics.compaction.concurrency_waits.inc();
227 Arc::clone(&concurrency_limit)
228 .acquire_owned()
229 .await
230 .expect("semaphore is never closed")
231 }
232 Err(TryAcquireError::Closed) => {
233 warn!("semaphore for shard {} is closed", machine.shard_id());
236 continue;
237 }
238 }
239 };
240 metrics
241 .compaction
242 .queued_seconds
243 .inc_by(enqueued.elapsed().as_secs_f64());
244
245 let write_schemas = write_schemas.clone();
246
247 let compact_span =
248 debug_span!(parent: None, "compact::apply", shard_id=%machine.shard_id());
249 compact_span.follows_from(&Span::current());
250 let gc = gc.clone();
251 mz_ore::task::spawn(|| "PersistCompactionWorker", async move {
252 let res = Self::compact_and_apply(&machine, req, write_schemas)
253 .instrument(compact_span)
254 .await;
255 if let Ok(maintenance) = res {
256 maintenance.start_performing(&machine, &gc);
257 }
258
259 let _ = completer.send(Ok(()));
262
263 drop(permit);
265 });
266 }
267 });
268
269 Compactor {
270 cfg,
271 metrics,
272 sender: compact_req_sender,
273 _phantom: PhantomData,
274 }
275 }
276
277 pub fn compact_and_apply_background(
282 &self,
283 req: CompactReq<T>,
284 machine: &Machine<K, V, T, D>,
285 ) -> Option<oneshot::Receiver<Result<(), anyhow::Error>>> {
286 let should_compact = req.inputs.len() >= COMPACTION_HEURISTIC_MIN_INPUTS.get(&self.cfg)
292 || req
293 .inputs
294 .iter()
295 .map(|x| x.batch.part_count())
296 .sum::<usize>()
297 >= COMPACTION_HEURISTIC_MIN_PARTS.get(&self.cfg)
298 || req.inputs.iter().map(|x| x.batch.len).sum::<usize>()
299 >= COMPACTION_HEURISTIC_MIN_UPDATES.get(&self.cfg);
300 if !should_compact {
301 self.metrics.compaction.skipped.inc();
302 return None;
303 }
304
305 let (compaction_completed_sender, compaction_completed_receiver) = oneshot::channel();
306 let new_compaction_sender = self.sender.clone();
307
308 self.metrics.compaction.requested.inc();
309 let send = new_compaction_sender.try_send((
315 Instant::now(),
316 req,
317 machine.clone(),
318 compaction_completed_sender,
319 ));
320 if let Err(_) = send {
321 self.metrics.compaction.dropped.inc();
322 return None;
323 }
324
325 Some(compaction_completed_receiver)
326 }
327
328 pub(crate) async fn compact_and_apply(
329 machine: &Machine<K, V, T, D>,
330 req: CompactReq<T>,
331 write_schemas: Schemas<K, V>,
332 ) -> Result<RoutineMaintenance, anyhow::Error> {
333 let metrics = Arc::clone(&machine.applier.metrics);
334 metrics.compaction.started.inc();
335 let start = Instant::now();
336
337 let total_input_bytes = req
340 .inputs
341 .iter()
342 .map(|batch| batch.batch.encoded_size_bytes())
343 .sum::<usize>();
344 let timeout = Duration::max(
345 COMPACTION_MINIMUM_TIMEOUT.get(&machine.applier.cfg),
347 Duration::from_secs(u64::cast_from(total_input_bytes / MiB)),
349 );
350 let compaction_schema_id = req
353 .inputs
354 .iter()
355 .flat_map(|batch| batch.batch.run_meta.iter())
356 .filter_map(|run_meta| run_meta.schema)
357 .max();
359 let maybe_compaction_schema = match compaction_schema_id {
360 Some(id) => machine
361 .get_schema(id)
362 .map(|(key_schema, val_schema)| (id, key_schema, val_schema)),
363 None => None,
364 };
365 let use_most_recent_schema = COMPACTION_USE_MOST_RECENT_SCHEMA.get(&machine.applier.cfg);
366
367 let compaction_schema = match maybe_compaction_schema {
368 Some((id, key_schema, val_schema)) if use_most_recent_schema => {
369 metrics.compaction.schema_selection.recent_schema.inc();
370 Schemas {
371 id: Some(id),
372 key: Arc::new(key_schema),
373 val: Arc::new(val_schema),
374 }
375 }
376 Some(_) => {
377 metrics.compaction.schema_selection.disabled.inc();
378 write_schemas
379 }
380 None => {
381 metrics.compaction.schema_selection.no_schema.inc();
382 write_schemas
383 }
384 };
385
386 trace!(
387 "compaction request for {}MBs ({} bytes), with timeout of {}s, and schema {:?}.",
388 total_input_bytes / MiB,
389 total_input_bytes,
390 timeout.as_secs_f64(),
391 compaction_schema.id,
392 );
393
394 let isolated_runtime = Arc::clone(&machine.isolated_runtime);
395 let machine_clone = machine.clone();
396 let metrics_clone = Arc::clone(&machine.applier.metrics);
397 let compact_span = debug_span!("compact::consolidate");
398 let res = tokio::time::timeout(
399 timeout,
400 isolated_runtime.spawn_named(
402 || "persist::compact::consolidate",
403 async move {
404 let all_runs_have_uuids = req
408 .inputs
409 .iter()
410 .all(|x| x.batch.runs().all(|(meta, _)| meta.id.is_some()));
411 let all_runs_have_len = req
412 .inputs
413 .iter()
414 .all(|x| x.batch.runs().all(|(meta, _)| meta.len.is_some()));
415
416 let compact_cfg =
417 CompactConfig::new(&machine_clone.applier.cfg, machine_clone.shard_id());
418 let incremental_enabled = compact_cfg.batch.enable_incremental_compaction
419 && all_runs_have_uuids
420 && all_runs_have_len;
421 let stream = Self::compact_stream(
422 compact_cfg,
423 Arc::clone(&machine_clone.applier.state_versions.blob),
424 Arc::clone(&metrics_clone),
425 Arc::clone(&machine_clone.applier.shard_metrics),
426 Arc::clone(&machine_clone.isolated_runtime),
427 req.clone(),
428 compaction_schema,
429 incremental_enabled,
430 );
431
432 let maintenance = if incremental_enabled {
433 let mut maintenance = RoutineMaintenance::default();
434 pin_mut!(stream);
435 while let Some(res) = stream.next().await {
436 let res = res?;
437 let new_maintenance =
438 Self::apply(res, &metrics_clone, &machine_clone).await?;
439 maintenance.merge(new_maintenance);
440 }
441 maintenance
442 } else {
443 let res = Self::compact_all(stream, req.clone()).await?;
444 Self::apply(
445 FueledMergeRes {
446 output: res.output,
447 input: CompactionInput::Legacy,
448 new_active_compaction: None,
449 },
450 &metrics_clone,
451 &machine_clone,
452 )
453 .await?
454 };
455
456 Ok::<_, anyhow::Error>(maintenance)
457 }
458 .instrument(compact_span),
459 ),
460 )
461 .await;
462
463 metrics
464 .compaction
465 .seconds
466 .inc_by(start.elapsed().as_secs_f64());
467 let res = res
468 .map_err(|e| {
469 metrics.compaction.timed_out.inc();
470 anyhow!(
471 "compaction timed out after {}s: {}",
472 timeout.as_secs_f64(),
473 e
474 )
475 })?
476 .map_err(|e| anyhow!(e))?;
477
478 match res {
479 Ok(maintenance) => Ok(maintenance),
480 Err(err) => {
481 metrics.compaction.failed.inc();
482 debug!(
483 "compaction for {} failed: {}",
484 machine.shard_id(),
485 err.display_with_causes()
486 );
487 Err(err)
488 }
489 }
490 }
491
492 pub async fn compact_all(
493 stream: impl Stream<Item = Result<FueledMergeRes<T>, anyhow::Error>>,
494 req: CompactReq<T>,
495 ) -> Result<CompactRes<T>, anyhow::Error> {
496 pin_mut!(stream);
497
498 let mut all_parts = vec![];
499 let mut all_run_splits = vec![];
500 let mut all_run_meta = vec![];
501 let mut len = 0;
502
503 while let Some(res) = stream.next().await {
504 let res = res?.output;
505 let (parts, updates, run_meta, run_splits) =
506 (res.parts, res.len, res.run_meta, res.run_splits);
507
508 if updates == 0 {
509 continue;
510 }
511
512 let run_offset = all_parts.len();
513 if !all_parts.is_empty() {
514 all_run_splits.push(run_offset);
515 }
516 all_run_splits.extend(run_splits.iter().map(|r| r + run_offset));
517 all_run_meta.extend(run_meta);
518 all_parts.extend(parts);
519 len += updates;
520 }
521
522 let batches = req.inputs.iter().map(|x| x.id).collect::<BTreeSet<_>>();
523 let input = input_id_range(batches);
524
525 Ok(CompactRes {
526 output: HollowBatch::new(
527 req.desc.clone(),
528 all_parts,
529 len,
530 all_run_meta,
531 all_run_splits,
532 ),
533 input,
534 })
535 }
536
537 pub async fn apply(
538 res: FueledMergeRes<T>,
539 metrics: &Metrics,
540 machine: &Machine<K, V, T, D>,
541 ) -> Result<RoutineMaintenance, anyhow::Error> {
542 let (apply_merge_result, maintenance) = machine.merge_res(&res).await;
543
544 match &apply_merge_result {
545 ApplyMergeResult::AppliedExact => {
546 metrics.compaction.applied.inc();
547 metrics.compaction.applied_exact_match.inc();
548 machine.applier.shard_metrics.compaction_applied.inc();
549 }
550 ApplyMergeResult::AppliedSubset => {
551 metrics.compaction.applied.inc();
552 metrics.compaction.applied_subset_match.inc();
553 machine.applier.shard_metrics.compaction_applied.inc();
554 }
555 ApplyMergeResult::NotAppliedNoMatch
556 | ApplyMergeResult::NotAppliedInvalidSince
557 | ApplyMergeResult::NotAppliedTooManyUpdates => {
558 if let ApplyMergeResult::NotAppliedTooManyUpdates = &apply_merge_result {
559 metrics.compaction.not_applied_too_many_updates.inc();
560 }
561 metrics.compaction.noop.inc();
562 let mut part_deletes = PartDeletes::default();
563 for part in &res.output.parts {
564 part_deletes.add(part);
565 }
566 part_deletes
567 .delete(
568 machine.applier.state_versions.blob.as_ref(),
569 machine.shard_id(),
570 GC_BLOB_DELETE_CONCURRENCY_LIMIT.get(&machine.applier.cfg),
571 &*metrics,
572 &metrics.retries.external.compaction_noop_delete,
573 )
574 .await;
575 }
576 };
577
578 Ok(maintenance)
579 }
580
581 pub fn compact_stream(
605 cfg: CompactConfig,
606 blob: Arc<dyn Blob>,
607 metrics: Arc<Metrics>,
608 shard_metrics: Arc<ShardMetrics>,
609 isolated_runtime: Arc<IsolatedRuntime>,
610 req: CompactReq<T>,
611 write_schemas: Schemas<K, V>,
612 incremental_enabled: bool,
613 ) -> impl Stream<Item = Result<FueledMergeRes<T>, anyhow::Error>> {
614 async_stream::stream! {
615 let () = Self::validate_req(&req)?;
616
617 let mut single_nonempty_batch = None;
622 for batch in &req.inputs {
623 if batch.batch.len > 0 {
624 match single_nonempty_batch {
625 None => single_nonempty_batch = Some(batch),
626 Some(_previous_nonempty_batch) => {
627 single_nonempty_batch = None;
628 break;
629 }
630 }
631 }
632 }
633 if let Some(single_nonempty_batch) = single_nonempty_batch {
634 if single_nonempty_batch.batch.run_splits.len() == 0
635 && single_nonempty_batch.batch.desc.since() != &Antichain::from_elem(T::minimum())
636 {
637 metrics.compaction.fast_path_eligible.inc();
638 }
639 }
640
641 let in_progress_part_reserved_memory_bytes = 2 * cfg.batch.blob_target_size;
643 let run_reserved_memory_bytes = cfg
648 .compaction_memory_bound_bytes
649 .saturating_sub(in_progress_part_reserved_memory_bytes);
650
651 let chunked_runs = Self::chunk_runs(
652 &req,
653 &cfg,
654 &*metrics,
655 run_reserved_memory_bytes,
656 req.desc.since()
657 );
658 let total_chunked_runs = chunked_runs.len();
659
660 let parts_before = req.inputs.iter().map(|x| x.batch.parts.len()).sum::<usize>();
661 let parts_after = chunked_runs.iter().flat_map(|(_, _, runs, _)| runs.iter().map(|(_, _, parts)| parts.len())).sum::<usize>();
662 assert_eq!(parts_before, parts_after, "chunking should not change the number of parts");
663
664 for (applied, (input, desc, runs, run_chunk_max_memory_usage)) in
665 chunked_runs.into_iter().enumerate()
666 {
667 metrics.compaction.chunks_compacted.inc();
668 metrics
669 .compaction
670 .runs_compacted
671 .inc_by(u64::cast_from(runs.len()));
672
673 let extra_outstanding_parts = (run_reserved_memory_bytes
677 .saturating_sub(run_chunk_max_memory_usage))
678 / cfg.batch.blob_target_size;
679 let mut run_cfg = cfg.clone();
680 run_cfg.batch.batch_builder_max_outstanding_parts = 1 + extra_outstanding_parts;
681
682 let desc = if incremental_enabled {
683 desc
684 } else {
685 req.desc.clone()
686 };
687
688 let runs = runs.iter()
689 .map(|(desc, meta, run)| (*desc, *meta, *run))
690 .collect::<Vec<_>>();
691
692 let batch = Self::compact_runs(
693 &run_cfg,
694 &req.shard_id,
695 &desc,
696 runs,
697 Arc::clone(&blob),
698 Arc::clone(&metrics),
699 Arc::clone(&shard_metrics),
700 Arc::clone(&isolated_runtime),
701 write_schemas.clone(),
702 )
703 .await?;
704
705 assert!(
706 (batch.len == 0 && batch.parts.len() == 0) || (batch.len > 0 && batch.parts.len() > 0),
707 "updates={}, parts={}",
708 batch.len,
709 batch.parts.len(),
710 );
711
712 let clock = cfg.now.clone();
714 let active_compaction = if applied < total_chunked_runs - 1 {
715 Some(ActiveCompaction { start_ms: clock() })
716 } else {
717 None
718 };
719
720 let res = CompactRes {
721 output: batch,
722 input,
723 };
724
725 let res = FueledMergeRes {
726 output: res.output,
727 new_active_compaction: active_compaction,
728 input: res.input,
729 };
730
731 yield Ok(res);
732 }
733 }
734 }
735
736 pub async fn compact(
741 cfg: CompactConfig,
742 blob: Arc<dyn Blob>,
743 metrics: Arc<Metrics>,
744 shard_metrics: Arc<ShardMetrics>,
745 isolated_runtime: Arc<IsolatedRuntime>,
746 req: CompactReq<T>,
747 write_schemas: Schemas<K, V>,
748 ) -> Result<CompactRes<T>, anyhow::Error> {
749 let stream = Self::compact_stream(
750 cfg,
751 Arc::clone(&blob),
752 Arc::clone(&metrics),
753 Arc::clone(&shard_metrics),
754 Arc::clone(&isolated_runtime),
755 req.clone(),
756 write_schemas,
757 false,
758 );
759
760 Self::compact_all(stream, req).await
761 }
762
763 fn chunk_runs<'a>(
768 req: &'a CompactReq<T>,
769 cfg: &CompactConfig,
770 metrics: &Metrics,
771 run_reserved_memory_bytes: usize,
772 since: &Antichain<T>,
773 ) -> Vec<(
774 CompactionInput,
775 Description<T>,
776 Vec<(&'a Description<T>, &'a RunMeta, &'a [RunPart<T>])>,
777 usize,
778 )> {
779 let _ = input_id_range(req.inputs.iter().map(|x| x.id).collect());
781
782 let mut batches: Vec<_> = req.inputs.iter().map(|x| (x.id, &*x.batch)).collect();
784 batches.sort_by_key(|(id, _)| *id);
785
786 let mut chunks = vec![];
787 let mut current_chunk_ids = BTreeSet::new();
788 let mut current_chunk_descs = Vec::new();
789 let mut current_chunk_runs = vec![];
790 let mut current_chunk_max_memory_usage = 0;
791
792 fn max_part_bytes<T>(parts: &[RunPart<T>], cfg: &CompactConfig) -> usize {
793 parts
794 .iter()
795 .map(|p| p.max_part_bytes())
796 .max()
797 .unwrap_or(cfg.batch.blob_target_size)
798 }
799
800 fn desc_range<T: Timestamp>(
801 descs: impl IntoIterator<Item = Description<T>>,
802 since: Antichain<T>,
803 ) -> Description<T> {
804 let mut descs = descs.into_iter();
805 let first = descs.next().expect("non-empty set of descriptions");
806 let lower = first.lower().clone();
807 let mut upper = first.upper().clone();
808 for desc in descs {
809 assert_eq!(&upper, desc.lower());
810 upper = desc.upper().clone();
811 }
812 let upper = upper.clone();
813 Description::new(lower, upper, since)
814 }
815
816 for (spine_id, batch) in batches {
817 let batch_size = batch
818 .runs()
819 .map(|(_, parts)| max_part_bytes(parts, cfg))
820 .sum::<usize>();
821
822 let num_runs = batch.run_meta.len();
823
824 let runs = batch.runs().flat_map(|(meta, parts)| {
825 if meta.order.unwrap_or(RunOrder::Codec) == cfg.batch.preferred_order {
826 Either::Left(std::iter::once((&batch.desc, meta, parts)))
827 } else {
828 soft_assert_or_log!(
838 !parts.iter().any(|r| matches!(r, RunPart::Many(_))),
839 "unexpected out-of-order hollow run"
840 );
841 Either::Right(
842 parts
843 .iter()
844 .map(move |p| (&batch.desc, meta, std::slice::from_ref(p))),
845 )
846 }
847 });
848
849 if current_chunk_max_memory_usage + batch_size <= run_reserved_memory_bytes
853 || current_chunk_runs.len() + num_runs <= 2
854 {
855 if current_chunk_max_memory_usage + batch_size > run_reserved_memory_bytes {
856 metrics.compaction.memory_violations.inc();
859 }
860 current_chunk_ids.insert(spine_id);
861 current_chunk_descs.push(batch.desc.clone());
862 current_chunk_runs.extend(runs);
863 current_chunk_max_memory_usage += batch_size;
864 continue;
865 }
866
867 if !current_chunk_ids.is_empty() {
869 chunks.push((
870 input_id_range(std::mem::take(&mut current_chunk_ids)),
871 desc_range(mem::take(&mut current_chunk_descs), since.clone()),
872 std::mem::take(&mut current_chunk_runs),
873 current_chunk_max_memory_usage,
874 ));
875 current_chunk_max_memory_usage = 0;
876 }
877
878 if batch_size <= run_reserved_memory_bytes {
880 current_chunk_ids.insert(spine_id);
881 current_chunk_descs.push(batch.desc.clone());
882 current_chunk_runs.extend(runs);
883 current_chunk_max_memory_usage += batch_size;
884 continue;
885 }
886
887 let mut run_iter = runs.into_iter().peekable();
890 debug_assert!(current_chunk_ids.is_empty());
891 debug_assert!(current_chunk_descs.is_empty());
892 debug_assert!(current_chunk_runs.is_empty());
893 debug_assert_eq!(current_chunk_max_memory_usage, 0);
894 let mut current_chunk_run_ids = BTreeSet::new();
895
896 while let Some((desc, meta, parts)) = run_iter.next() {
897 let run_size = max_part_bytes(parts, cfg);
898 current_chunk_runs.push((desc, meta, parts));
899 current_chunk_max_memory_usage += run_size;
900 current_chunk_run_ids.extend(meta.id);
901
902 if let Some((_, _meta, next_parts)) = run_iter.peek() {
903 let next_size = max_part_bytes(next_parts, cfg);
904 if current_chunk_max_memory_usage + next_size > run_reserved_memory_bytes {
905 if current_chunk_runs.len() == 1 {
907 metrics.compaction.memory_violations.inc();
908 continue;
909 }
910 chunks.push((
912 CompactionInput::PartialBatch(
913 spine_id,
914 mem::take(&mut current_chunk_run_ids),
915 ),
916 desc_range([batch.desc.clone()], since.clone()),
917 std::mem::take(&mut current_chunk_runs),
918 current_chunk_max_memory_usage,
919 ));
920 current_chunk_max_memory_usage = 0;
921 }
922 }
923 }
924
925 if !current_chunk_runs.is_empty() {
926 chunks.push((
927 CompactionInput::PartialBatch(spine_id, mem::take(&mut current_chunk_run_ids)),
928 desc_range([batch.desc.clone()], since.clone()),
929 std::mem::take(&mut current_chunk_runs),
930 current_chunk_max_memory_usage,
931 ));
932 current_chunk_max_memory_usage = 0;
933 }
934 }
935
936 if !current_chunk_ids.is_empty() {
938 chunks.push((
939 input_id_range(current_chunk_ids),
940 desc_range(current_chunk_descs, since.clone()),
941 current_chunk_runs,
942 current_chunk_max_memory_usage,
943 ));
944 }
945
946 chunks
947 }
948
949 pub(crate) async fn compact_runs(
953 cfg: &CompactConfig,
954 shard_id: &ShardId,
955 desc: &Description<T>,
956 runs: Vec<(&Description<T>, &RunMeta, &[RunPart<T>])>,
957 blob: Arc<dyn Blob>,
958 metrics: Arc<Metrics>,
959 shard_metrics: Arc<ShardMetrics>,
960 isolated_runtime: Arc<IsolatedRuntime>,
961 write_schemas: Schemas<K, V>,
962 ) -> Result<HollowBatch<T>, anyhow::Error> {
963 let prefetch_budget_bytes = 2 * cfg.batch.blob_target_size;
971
972 let mut timings = Timings::default();
973
974 let mut batch_cfg = cfg.batch.clone();
975
976 batch_cfg.inline_writes_single_max_bytes = 0;
981
982 let parts = BatchParts::new_ordered::<D>(
983 batch_cfg,
984 cfg.batch.preferred_order,
985 Arc::clone(&metrics),
986 Arc::clone(&shard_metrics),
987 *shard_id,
988 Arc::clone(&blob),
989 Arc::clone(&isolated_runtime),
990 &metrics.compaction.batch,
991 );
992 let mut batch = BatchBuilderInternal::<K, V, T, D>::new(
993 cfg.batch.clone(),
994 parts,
995 Arc::clone(&metrics),
996 write_schemas.clone(),
997 Arc::clone(&blob),
998 shard_id.clone(),
999 cfg.version.clone(),
1000 );
1001
1002 let mut consolidator = Consolidator::new(
1003 format!(
1004 "{}[lower={:?},upper={:?}]",
1005 shard_id,
1006 desc.lower().elements(),
1007 desc.upper().elements()
1008 ),
1009 cfg.fetch_config.clone(),
1010 *shard_id,
1011 StructuredSort::<K, V, T, D>::new(write_schemas.clone()),
1012 blob,
1013 Arc::clone(&metrics),
1014 shard_metrics,
1015 metrics.read.compaction.clone(),
1016 FetchBatchFilter::Compaction {
1017 since: desc.since().clone(),
1018 },
1019 None,
1020 prefetch_budget_bytes,
1021 );
1022
1023 for (desc, meta, parts) in runs {
1024 consolidator.enqueue_run(desc, meta, parts.iter().cloned());
1025 }
1026
1027 let remaining_budget = consolidator.start_prefetches();
1028 if remaining_budget.is_none() {
1029 metrics.compaction.not_all_prefetched.inc();
1030 }
1031
1032 loop {
1033 let mut chunks = vec![];
1034 let mut total_bytes = 0;
1035 while total_bytes < cfg.batch.blob_target_size {
1040 let fetch_start = Instant::now();
1041 let Some(chunk) = consolidator
1042 .next_chunk(
1043 cfg.compaction_yield_after_n_updates,
1044 cfg.batch.blob_target_size - total_bytes,
1045 )
1046 .await?
1047 else {
1048 break;
1049 };
1050 timings.part_fetching += fetch_start.elapsed();
1051 total_bytes += chunk.goodbytes();
1052 chunks.push(chunk);
1053 tokio::task::yield_now().await;
1054 }
1055
1056 let Some(updates) = Part::concat(&chunks).expect("compaction produces well-typed data")
1058 else {
1059 break;
1060 };
1061 batch.flush_part(desc.clone(), updates).await;
1062 }
1063 let mut batch = batch.finish(desc.clone()).await?;
1064
1065 let has_inline_parts = batch.batch.parts.iter().any(|x| x.is_inline());
1071 if has_inline_parts {
1072 error!(%shard_id, ?cfg, "compaction result unexpectedly had inline writes");
1073 let () = batch
1074 .flush_to_blob(
1075 &cfg.batch,
1076 &metrics.compaction.batch,
1077 &isolated_runtime,
1078 &write_schemas,
1079 )
1080 .await;
1081 }
1082
1083 timings.record(&metrics);
1084 Ok(batch.into_hollow_batch())
1085 }
1086
1087 fn validate_req(req: &CompactReq<T>) -> Result<(), anyhow::Error> {
1088 let mut frontier = req.desc.lower();
1089 for input in req.inputs.iter() {
1090 if PartialOrder::less_than(req.desc.since(), input.batch.desc.since()) {
1091 return Err(anyhow!(
1092 "output since {:?} must be at or in advance of input since {:?}",
1093 req.desc.since(),
1094 input.batch.desc.since()
1095 ));
1096 }
1097 if frontier != input.batch.desc.lower() {
1098 return Err(anyhow!(
1099 "invalid merge of non-consecutive batches {:?} vs {:?}",
1100 frontier,
1101 input.batch.desc.lower()
1102 ));
1103 }
1104 frontier = input.batch.desc.upper();
1105 }
1106 if frontier != req.desc.upper() {
1107 return Err(anyhow!(
1108 "invalid merge of non-consecutive batches {:?} vs {:?}",
1109 frontier,
1110 req.desc.upper()
1111 ));
1112 }
1113 Ok(())
1114 }
1115}
1116
1117#[derive(Debug, Default)]
1118struct Timings {
1119 part_fetching: Duration,
1120 heap_population: Duration,
1121}
1122
1123impl Timings {
1124 fn record(self, metrics: &Metrics) {
1125 let Timings {
1127 part_fetching,
1128 heap_population,
1129 } = self;
1130
1131 metrics
1132 .compaction
1133 .steps
1134 .part_fetch_seconds
1135 .inc_by(part_fetching.as_secs_f64());
1136 metrics
1137 .compaction
1138 .steps
1139 .heap_population_seconds
1140 .inc_by(heap_population.as_secs_f64());
1141 }
1142}
1143
1144#[cfg(test)]
1145mod tests {
1146 use mz_dyncfg::ConfigUpdates;
1147 use mz_ore::{assert_contains, assert_err};
1148 use mz_persist_types::codec_impls::StringSchema;
1149 use timely::progress::Antichain;
1150
1151 use crate::PersistLocation;
1152 use crate::batch::BLOB_TARGET_SIZE;
1153 use crate::internal::trace::SpineId;
1154 use crate::tests::{all_ok, expect_fetch_part, new_test_client_cache};
1155
1156 use super::*;
1157
1158 #[mz_persist_proc::test(tokio::test)]
1162 #[cfg_attr(miri, ignore)] async fn regression_minimum_since(dyncfgs: ConfigUpdates) {
1164 let data = vec![
1165 (("0".to_owned(), "zero".to_owned()), 0, 1),
1166 (("0".to_owned(), "zero".to_owned()), 1, -1),
1167 (("1".to_owned(), "one".to_owned()), 1, 1),
1168 ];
1169
1170 let cache = new_test_client_cache(&dyncfgs);
1171 cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1172 let (mut write, _) = cache
1173 .open(PersistLocation::new_in_mem())
1174 .await
1175 .expect("client construction failed")
1176 .expect_open::<String, String, u64, i64>(ShardId::new())
1177 .await;
1178 let b0 = write
1179 .expect_batch(&data[..1], 0, 1)
1180 .await
1181 .into_hollow_batch();
1182 let b1 = write
1183 .expect_batch(&data[1..], 1, 2)
1184 .await
1185 .into_hollow_batch();
1186
1187 let req = CompactReq {
1188 shard_id: write.machine.shard_id(),
1189 desc: Description::new(
1190 b0.desc.lower().clone(),
1191 b1.desc.upper().clone(),
1192 Antichain::from_elem(10u64),
1193 ),
1194 inputs: vec![
1195 IdHollowBatch {
1196 batch: Arc::new(b0),
1197 id: SpineId(0, 1),
1198 },
1199 IdHollowBatch {
1200 batch: Arc::new(b1),
1201 id: SpineId(1, 2),
1202 },
1203 ],
1204 };
1205 let schemas = Schemas {
1206 id: None,
1207 key: Arc::new(StringSchema),
1208 val: Arc::new(StringSchema),
1209 };
1210 let res = Compactor::<String, String, u64, i64>::compact(
1211 CompactConfig::new(&write.cfg, write.shard_id()),
1212 Arc::clone(&write.blob),
1213 Arc::clone(&write.metrics),
1214 write.metrics.shards.shard(&write.machine.shard_id(), ""),
1215 Arc::new(IsolatedRuntime::new_for_tests()),
1216 req.clone(),
1217 schemas.clone(),
1218 )
1219 .await
1220 .expect("compaction failed");
1221
1222 assert_eq!(res.output.desc, req.desc);
1223 assert_eq!(res.output.len, 1);
1224 assert_eq!(res.output.part_count(), 1);
1225 let part = res.output.parts[0].expect_hollow_part();
1226 let (part, updates) = expect_fetch_part(
1227 write.blob.as_ref(),
1228 &part.key.complete(&write.machine.shard_id()),
1229 &write.metrics,
1230 &schemas,
1231 )
1232 .await;
1233 assert_eq!(part.desc, res.output.desc);
1234 assert_eq!(updates, all_ok(&data, 10));
1235 }
1236
1237 #[mz_persist_proc::test(tokio::test)]
1238 #[cfg_attr(miri, ignore)] async fn disable_compaction(dyncfgs: ConfigUpdates) {
1240 let data = [
1241 (("0".to_owned(), "zero".to_owned()), 0, 1),
1242 (("0".to_owned(), "zero".to_owned()), 1, -1),
1243 (("1".to_owned(), "one".to_owned()), 1, 1),
1244 ];
1245
1246 let cache = new_test_client_cache(&dyncfgs);
1247 cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1248 let (mut write, _) = cache
1249 .open(PersistLocation::new_in_mem())
1250 .await
1251 .expect("client construction failed")
1252 .expect_open::<String, String, u64, i64>(ShardId::new())
1253 .await;
1254 let b0 = write
1255 .expect_batch(&data[..1], 0, 1)
1256 .await
1257 .into_hollow_batch();
1258 let b1 = write
1259 .expect_batch(&data[1..], 1, 2)
1260 .await
1261 .into_hollow_batch();
1262
1263 let req = CompactReq {
1264 shard_id: write.machine.shard_id(),
1265 desc: Description::new(
1266 b0.desc.lower().clone(),
1267 b1.desc.upper().clone(),
1268 Antichain::from_elem(10u64),
1269 ),
1270 inputs: vec![
1271 IdHollowBatch {
1272 batch: Arc::new(b0),
1273 id: SpineId(0, 1),
1274 },
1275 IdHollowBatch {
1276 batch: Arc::new(b1),
1277 id: SpineId(1, 2),
1278 },
1279 ],
1280 };
1281 write.cfg.set_config(&COMPACTION_HEURISTIC_MIN_INPUTS, 1);
1282 let compactor = write.compact.as_ref().expect("compaction hard disabled");
1283
1284 write.cfg.disable_compaction();
1285 let result = compactor
1286 .compact_and_apply_background(req.clone(), &write.machine)
1287 .expect("listener")
1288 .await
1289 .expect("channel closed");
1290 assert_err!(result);
1291 assert_contains!(result.unwrap_err().to_string(), "compaction disabled");
1292
1293 write.cfg.enable_compaction();
1294 compactor
1295 .compact_and_apply_background(req, &write.machine)
1296 .expect("listener")
1297 .await
1298 .expect("channel closed")
1299 .expect("compaction success");
1300
1301 let data2 = [
1303 (("2".to_owned(), "two".to_owned()), 2, 1),
1304 (("2".to_owned(), "two".to_owned()), 3, -1),
1305 (("3".to_owned(), "three".to_owned()), 3, 1),
1306 ];
1307
1308 let b2 = write
1309 .expect_batch(&data2[..1], 2, 3)
1310 .await
1311 .into_hollow_batch();
1312 let b3 = write
1313 .expect_batch(&data2[1..], 3, 4)
1314 .await
1315 .into_hollow_batch();
1316
1317 let req = CompactReq {
1318 shard_id: write.machine.shard_id(),
1319 desc: Description::new(
1320 b2.desc.lower().clone(),
1321 b3.desc.upper().clone(),
1322 Antichain::from_elem(20u64),
1323 ),
1324 inputs: vec![
1325 IdHollowBatch {
1326 batch: Arc::new(b2),
1327 id: SpineId(0, 1),
1328 },
1329 IdHollowBatch {
1330 batch: Arc::new(b3),
1331 id: SpineId(1, 2),
1332 },
1333 ],
1334 };
1335 let compactor = write.compact.as_ref().expect("compaction hard disabled");
1336
1337 write.cfg.set_config(&COMPACTION_CHECK_PROCESS_FLAG, false);
1339 write.cfg.disable_compaction();
1340 compactor
1342 .compact_and_apply_background(req, &write.machine)
1343 .expect("listener")
1344 .await
1345 .expect("channel closed")
1346 .expect("compaction success");
1347 }
1348}