1use std::borrow::Cow;
11use std::collections::{BTreeMap, BTreeSet};
12use std::fmt::Debug;
13use std::marker::PhantomData;
14use std::pin::pin;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use anyhow::anyhow;
19use differential_dataflow::difference::Semigroup;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::trace::Description;
22use futures::{Stream, pin_mut};
23use futures_util::StreamExt;
24use itertools::Itertools;
25use mz_dyncfg::Config;
26use mz_ore::cast::CastFrom;
27use mz_ore::error::ErrorExt;
28use mz_ore::now::NowFn;
29use mz_persist::location::Blob;
30use mz_persist_types::part::Part;
31use mz_persist_types::{Codec, Codec64};
32use timely::PartialOrder;
33use timely::progress::{Antichain, Timestamp};
34use tokio::sync::mpsc::Sender;
35use tokio::sync::{TryAcquireError, mpsc, oneshot};
36use tracing::{Instrument, Span, debug, debug_span, error, trace, warn};
37
38use crate::async_runtime::IsolatedRuntime;
39use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, BatchParts, PartDeletes};
40use crate::cfg::{
41 COMPACTION_HEURISTIC_MIN_INPUTS, COMPACTION_HEURISTIC_MIN_PARTS,
42 COMPACTION_HEURISTIC_MIN_UPDATES, COMPACTION_MEMORY_BOUND_BYTES,
43 GC_BLOB_DELETE_CONCURRENCY_LIMIT, MiB,
44};
45use crate::fetch::{FetchBatchFilter, FetchConfig};
46use crate::internal::encoding::Schemas;
47use crate::internal::gc::GarbageCollector;
48use crate::internal::machine::Machine;
49use crate::internal::maintenance::RoutineMaintenance;
50use crate::internal::metrics::ShardMetrics;
51use crate::internal::state::{
52 ENABLE_INCREMENTAL_COMPACTION, HollowBatch, RunId, RunMeta, RunOrder, RunPart,
53};
54use crate::internal::trace::{
55 ActiveCompaction, ApplyMergeResult, CompactionInput, FueledMergeRes, IdHollowBatch, SpineId,
56 id_range,
57};
58use crate::iter::{Consolidator, StructuredSort};
59use crate::{Metrics, PersistConfig, ShardId};
60
61#[derive(Debug, Clone)]
67pub struct CompactReq<T> {
68 pub shard_id: ShardId,
70 pub desc: Description<T>,
72 pub inputs: Vec<IdHollowBatch<T>>,
75}
76
77#[derive(Debug)]
79pub struct CompactRes<T> {
80 pub output: HollowBatch<T>,
82 pub input: CompactionInput,
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
89pub struct RunLocation(pub SpineId, pub Option<RunId>);
90
91#[derive(Debug, Clone)]
94pub struct CompactConfig {
95 pub(crate) compaction_memory_bound_bytes: usize,
96 pub(crate) compaction_yield_after_n_updates: usize,
97 pub(crate) incremental_compaction: bool,
98 pub(crate) version: semver::Version,
99 pub(crate) batch: BatchBuilderConfig,
100 pub(crate) fetch_config: FetchConfig,
101 pub(crate) now: NowFn,
102}
103
104impl CompactConfig {
105 pub fn new(value: &PersistConfig, shard_id: ShardId) -> Self {
107 CompactConfig {
108 compaction_memory_bound_bytes: COMPACTION_MEMORY_BOUND_BYTES.get(value),
109 compaction_yield_after_n_updates: value.compaction_yield_after_n_updates,
110 incremental_compaction: ENABLE_INCREMENTAL_COMPACTION.get(value),
111 version: value.build_version.clone(),
112 batch: BatchBuilderConfig::new(value, shard_id),
113 fetch_config: FetchConfig::from_persist_config(value),
114 now: value.now.clone(),
115 }
116 }
117}
118
119#[derive(Debug)]
125pub struct Compactor<K, V, T, D> {
126 cfg: PersistConfig,
127 metrics: Arc<Metrics>,
128 sender: Sender<(
129 Instant,
130 CompactReq<T>,
131 Machine<K, V, T, D>,
132 oneshot::Sender<Result<(), anyhow::Error>>,
133 )>,
134 _phantom: PhantomData<fn() -> D>,
135}
136
137impl<K, V, T, D> Clone for Compactor<K, V, T, D> {
138 fn clone(&self) -> Self {
139 Compactor {
140 cfg: self.cfg.clone(),
141 metrics: Arc::clone(&self.metrics),
142 sender: self.sender.clone(),
143 _phantom: Default::default(),
144 }
145 }
146}
147
148pub(crate) const COMPACTION_MINIMUM_TIMEOUT: Config<Duration> = Config::new(
152 "persist_compaction_minimum_timeout",
153 Duration::from_secs(90),
154 "\
155 The minimum amount of time to allow a persist compaction request to run \
156 before timing it out (Materialize).",
157);
158
159pub(crate) const COMPACTION_USE_MOST_RECENT_SCHEMA: Config<bool> = Config::new(
160 "persist_compaction_use_most_recent_schema",
161 true,
162 "\
163 Use the most recent schema from all the Runs that are currently being \
164 compacted, instead of the schema on the current write handle (Materialize).
165 ",
166);
167
168pub(crate) const COMPACTION_CHECK_PROCESS_FLAG: Config<bool> = Config::new(
169 "persist_compaction_check_process_flag",
170 true,
171 "Whether Compactor will obey the process_requests flag in PersistConfig, \
172 which allows dynamically disabling compaction. If false, all compaction requests will be processed.",
173);
174
175fn input_id_range(ids: BTreeSet<SpineId>) -> CompactionInput {
177 let id = id_range(ids);
178
179 CompactionInput::IdRange(id)
180}
181
182impl<K, V, T, D> Compactor<K, V, T, D>
183where
184 K: Debug + Codec,
185 V: Debug + Codec,
186 T: Timestamp + Lattice + Codec64 + Sync,
187 D: Semigroup + Ord + Codec64 + Send + Sync,
188{
189 pub fn new(
190 cfg: PersistConfig,
191 metrics: Arc<Metrics>,
192 write_schemas: Schemas<K, V>,
193 gc: GarbageCollector<K, V, T, D>,
194 ) -> Self {
195 let (compact_req_sender, mut compact_req_receiver) = mpsc::channel::<(
196 Instant,
197 CompactReq<T>,
198 Machine<K, V, T, D>,
199 oneshot::Sender<Result<(), anyhow::Error>>,
200 )>(cfg.compaction_queue_size);
201 let concurrency_limit = Arc::new(tokio::sync::Semaphore::new(
202 cfg.compaction_concurrency_limit,
203 ));
204 let check_process_requests = COMPACTION_CHECK_PROCESS_FLAG.handle(&cfg.configs);
205 let process_requests = Arc::clone(&cfg.compaction_process_requests);
206
207 let _worker_handle = mz_ore::task::spawn(|| "PersistCompactionScheduler", async move {
210 while let Some((enqueued, req, machine, completer)) = compact_req_receiver.recv().await
211 {
212 assert_eq!(req.shard_id, machine.shard_id());
213 let metrics = Arc::clone(&machine.applier.metrics);
214
215 if check_process_requests.get()
217 && !process_requests.load(std::sync::atomic::Ordering::Relaxed)
218 {
219 let _ = completer.send(Err(anyhow::anyhow!("compaction disabled")));
222 metrics.compaction.disabled.inc();
223 tracing::warn!(shard_id = ?req.shard_id, "Dropping compaction request on the floor.");
224
225 continue;
226 }
227
228 let permit = {
229 let inner = Arc::clone(&concurrency_limit);
230 match inner.try_acquire_owned() {
233 Ok(permit) => permit,
234 Err(TryAcquireError::NoPermits) => {
235 metrics.compaction.concurrency_waits.inc();
236 Arc::clone(&concurrency_limit)
237 .acquire_owned()
238 .await
239 .expect("semaphore is never closed")
240 }
241 Err(TryAcquireError::Closed) => {
242 warn!("semaphore for shard {} is closed", machine.shard_id());
245 continue;
246 }
247 }
248 };
249 metrics
250 .compaction
251 .queued_seconds
252 .inc_by(enqueued.elapsed().as_secs_f64());
253
254 let write_schemas = write_schemas.clone();
255
256 let compact_span =
257 debug_span!(parent: None, "compact::apply", shard_id=%machine.shard_id());
258 compact_span.follows_from(&Span::current());
259 let gc = gc.clone();
260 mz_ore::task::spawn(|| "PersistCompactionWorker", async move {
261 let res = Self::compact_and_apply(&machine, req, write_schemas)
262 .instrument(compact_span)
263 .await;
264 if let Ok(maintenance) = res {
265 maintenance.start_performing(&machine, &gc);
266 }
267
268 let _ = completer.send(Ok(()));
271
272 drop(permit);
274 });
275 }
276 });
277
278 Compactor {
279 cfg,
280 metrics,
281 sender: compact_req_sender,
282 _phantom: PhantomData,
283 }
284 }
285
286 pub fn compact_and_apply_background(
291 &self,
292 req: CompactReq<T>,
293 machine: &Machine<K, V, T, D>,
294 ) -> Option<oneshot::Receiver<Result<(), anyhow::Error>>> {
295 let should_compact = req.inputs.len() >= COMPACTION_HEURISTIC_MIN_INPUTS.get(&self.cfg)
301 || req
302 .inputs
303 .iter()
304 .map(|x| x.batch.part_count())
305 .sum::<usize>()
306 >= COMPACTION_HEURISTIC_MIN_PARTS.get(&self.cfg)
307 || req.inputs.iter().map(|x| x.batch.len).sum::<usize>()
308 >= COMPACTION_HEURISTIC_MIN_UPDATES.get(&self.cfg);
309 if !should_compact {
310 self.metrics.compaction.skipped.inc();
311 return None;
312 }
313
314 let (compaction_completed_sender, compaction_completed_receiver) = oneshot::channel();
315 let new_compaction_sender = self.sender.clone();
316
317 self.metrics.compaction.requested.inc();
318 let send = new_compaction_sender.try_send((
324 Instant::now(),
325 req,
326 machine.clone(),
327 compaction_completed_sender,
328 ));
329 if let Err(_) = send {
330 self.metrics.compaction.dropped.inc();
331 return None;
332 }
333
334 Some(compaction_completed_receiver)
335 }
336
337 pub(crate) async fn compact_and_apply(
338 machine: &Machine<K, V, T, D>,
339 req: CompactReq<T>,
340 write_schemas: Schemas<K, V>,
341 ) -> Result<RoutineMaintenance, anyhow::Error> {
342 let metrics = Arc::clone(&machine.applier.metrics);
343 metrics.compaction.started.inc();
344 let start = Instant::now();
345
346 let total_input_bytes = req
349 .inputs
350 .iter()
351 .map(|batch| batch.batch.encoded_size_bytes())
352 .sum::<usize>();
353 let timeout = Duration::max(
354 COMPACTION_MINIMUM_TIMEOUT.get(&machine.applier.cfg),
356 Duration::from_secs(u64::cast_from(total_input_bytes / MiB)),
358 );
359 let compaction_schema_id = req
362 .inputs
363 .iter()
364 .flat_map(|batch| batch.batch.run_meta.iter())
365 .filter_map(|run_meta| run_meta.schema)
366 .max();
368 let maybe_compaction_schema = match compaction_schema_id {
369 Some(id) => machine
370 .get_schema(id)
371 .map(|(key_schema, val_schema)| (id, key_schema, val_schema)),
372 None => None,
373 };
374 let use_most_recent_schema = COMPACTION_USE_MOST_RECENT_SCHEMA.get(&machine.applier.cfg);
375
376 let compaction_schema = match maybe_compaction_schema {
377 Some((id, key_schema, val_schema)) if use_most_recent_schema => {
378 metrics.compaction.schema_selection.recent_schema.inc();
379 Schemas {
380 id: Some(id),
381 key: Arc::new(key_schema),
382 val: Arc::new(val_schema),
383 }
384 }
385 Some(_) => {
386 metrics.compaction.schema_selection.disabled.inc();
387 write_schemas
388 }
389 None => {
390 metrics.compaction.schema_selection.no_schema.inc();
391 write_schemas
392 }
393 };
394
395 trace!(
396 "compaction request for {}MBs ({} bytes), with timeout of {}s, and schema {:?}.",
397 total_input_bytes / MiB,
398 total_input_bytes,
399 timeout.as_secs_f64(),
400 compaction_schema.id,
401 );
402
403 let isolated_runtime = Arc::clone(&machine.isolated_runtime);
404 let machine_clone = machine.clone();
405 let metrics_clone = Arc::clone(&machine.applier.metrics);
406 let compact_span = debug_span!("compact::consolidate");
407 let res = tokio::time::timeout(
408 timeout,
409 isolated_runtime.spawn_named(
411 || "persist::compact::consolidate",
412 async move {
413 let all_runs_have_uuids = req
417 .inputs
418 .iter()
419 .all(|x| x.batch.runs().all(|(meta, _)| meta.id.is_some()));
420 let all_runs_have_len = req
421 .inputs
422 .iter()
423 .all(|x| x.batch.runs().all(|(meta, _)| meta.len.is_some()));
424
425 let incremental_enabled = ENABLE_INCREMENTAL_COMPACTION
426 .get(&machine_clone.applier.cfg)
427 && all_runs_have_uuids
428 && all_runs_have_len;
429 let stream = Self::compact_stream(
430 CompactConfig::new(&machine_clone.applier.cfg, machine_clone.shard_id()),
431 Arc::clone(&machine_clone.applier.state_versions.blob),
432 Arc::clone(&metrics_clone),
433 Arc::clone(&machine_clone.applier.shard_metrics),
434 Arc::clone(&machine_clone.isolated_runtime),
435 req.clone(),
436 compaction_schema,
437 incremental_enabled,
438 );
439
440 let maintenance = if incremental_enabled {
441 let mut maintenance = RoutineMaintenance::default();
442 pin_mut!(stream);
443 while let Some(res) = stream.next().await {
444 let res = res?;
445 let new_maintenance =
446 Self::apply(res, &metrics_clone, &machine_clone).await?;
447 maintenance.merge(new_maintenance);
448 }
449 maintenance
450 } else {
451 let res = Self::compact_all(stream, req.clone()).await?;
452 Self::apply(
453 FueledMergeRes {
454 output: res.output,
455 input: res.input,
456 new_active_compaction: None,
457 },
458 &metrics_clone,
459 &machine_clone,
460 )
461 .await?
462 };
463
464 Ok::<_, anyhow::Error>(maintenance)
465 }
466 .instrument(compact_span),
467 ),
468 )
469 .await;
470
471 metrics
472 .compaction
473 .seconds
474 .inc_by(start.elapsed().as_secs_f64());
475 let res = res
476 .map_err(|e| {
477 metrics.compaction.timed_out.inc();
478 anyhow!(
479 "compaction timed out after {}s: {}",
480 timeout.as_secs_f64(),
481 e
482 )
483 })?
484 .map_err(|e| anyhow!(e))?;
485
486 match res {
487 Ok(maintenance) => Ok(maintenance),
488 Err(err) => {
489 metrics.compaction.failed.inc();
490 debug!(
491 "compaction for {} failed: {}",
492 machine.shard_id(),
493 err.display_with_causes()
494 );
495 Err(err)
496 }
497 }
498 }
499
500 pub async fn compact_all(
501 stream: impl Stream<Item = Result<FueledMergeRes<T>, anyhow::Error>>,
502 req: CompactReq<T>,
503 ) -> Result<CompactRes<T>, anyhow::Error> {
504 pin_mut!(stream);
505
506 let mut all_parts = vec![];
507 let mut all_run_splits = vec![];
508 let mut all_run_meta = vec![];
509 let mut len = 0;
510
511 while let Some(res) = stream.next().await {
512 let res = res?.output;
513 let (parts, updates, run_meta, run_splits) =
514 (res.parts, res.len, res.run_meta, res.run_splits);
515
516 if updates == 0 {
517 continue;
518 }
519
520 let run_offset = all_parts.len();
521 if !all_parts.is_empty() {
522 all_run_splits.push(run_offset);
523 }
524 all_run_splits.extend(run_splits.iter().map(|r| r + run_offset));
525 all_run_meta.extend(run_meta);
526 all_parts.extend(parts);
527 len += updates;
528 }
529
530 let batches = req.inputs.iter().map(|x| x.id).collect::<BTreeSet<_>>();
531 let input = input_id_range(batches);
532
533 Ok(CompactRes {
534 output: HollowBatch::new(
535 req.desc.clone(),
536 all_parts,
537 len,
538 all_run_meta,
539 all_run_splits,
540 ),
541 input,
542 })
543 }
544
545 pub async fn apply(
546 res: FueledMergeRes<T>,
547 metrics: &Metrics,
548 machine: &Machine<K, V, T, D>,
549 ) -> Result<RoutineMaintenance, anyhow::Error> {
550 let (apply_merge_result, maintenance) = machine.merge_res(&res).await;
551
552 match &apply_merge_result {
553 ApplyMergeResult::AppliedExact => {
554 metrics.compaction.applied.inc();
555 metrics.compaction.applied_exact_match.inc();
556 machine.applier.shard_metrics.compaction_applied.inc();
557 }
558 ApplyMergeResult::AppliedSubset => {
559 metrics.compaction.applied.inc();
560 metrics.compaction.applied_subset_match.inc();
561 machine.applier.shard_metrics.compaction_applied.inc();
562 }
563 ApplyMergeResult::NotAppliedNoMatch
564 | ApplyMergeResult::NotAppliedInvalidSince
565 | ApplyMergeResult::NotAppliedTooManyUpdates => {
566 if let ApplyMergeResult::NotAppliedTooManyUpdates = &apply_merge_result {
567 metrics.compaction.not_applied_too_many_updates.inc();
568 }
569 metrics.compaction.noop.inc();
570 let mut part_deletes = PartDeletes::default();
571 for part in &res.output.parts {
572 part_deletes.add(part);
573 }
574 part_deletes
575 .delete(
576 machine.applier.state_versions.blob.as_ref(),
577 machine.shard_id(),
578 GC_BLOB_DELETE_CONCURRENCY_LIMIT.get(&machine.applier.cfg),
579 &*metrics,
580 &metrics.retries.external.compaction_noop_delete,
581 )
582 .await;
583 }
584 };
585
586 Ok(maintenance)
587 }
588
589 pub fn compact_stream(
613 cfg: CompactConfig,
614 blob: Arc<dyn Blob>,
615 metrics: Arc<Metrics>,
616 shard_metrics: Arc<ShardMetrics>,
617 isolated_runtime: Arc<IsolatedRuntime>,
618 req: CompactReq<T>,
619 write_schemas: Schemas<K, V>,
620 incremental_enabled: bool,
621 ) -> impl Stream<Item = Result<FueledMergeRes<T>, anyhow::Error>> {
622 async_stream::stream! {
623 let () = Self::validate_req(&req)?;
624
625 let mut single_nonempty_batch = None;
630 for batch in &req.inputs {
631 if batch.batch.len > 0 {
632 match single_nonempty_batch {
633 None => single_nonempty_batch = Some(batch),
634 Some(_previous_nonempty_batch) => {
635 single_nonempty_batch = None;
636 break;
637 }
638 }
639 }
640 }
641 if let Some(single_nonempty_batch) = single_nonempty_batch {
642 if single_nonempty_batch.batch.run_splits.len() == 0
643 && single_nonempty_batch.batch.desc.since() != &Antichain::from_elem(T::minimum())
644 {
645 metrics.compaction.fast_path_eligible.inc();
646 }
647 }
648
649 let in_progress_part_reserved_memory_bytes = 2 * cfg.batch.blob_target_size;
651 let run_reserved_memory_bytes = cfg
656 .compaction_memory_bound_bytes
657 .saturating_sub(in_progress_part_reserved_memory_bytes);
658
659 let ordered_runs =
660 Self::flatten_runs(&req, cfg.batch.preferred_order, &*blob, &*metrics).await?;
661
662 let chunked_runs = Self::chunk_runs(
663 &ordered_runs,
664 &cfg,
665 &*metrics,
666 run_reserved_memory_bytes,
667 );
668 let total_chunked_runs = chunked_runs.len();
669
670 for (applied, (runs, run_chunk_max_memory_usage)) in
671 chunked_runs.into_iter().enumerate()
672 {
673 metrics.compaction.chunks_compacted.inc();
674 metrics
675 .compaction
676 .runs_compacted
677 .inc_by(u64::cast_from(runs.len()));
678
679 let extra_outstanding_parts = (run_reserved_memory_bytes
683 .saturating_sub(run_chunk_max_memory_usage))
684 / cfg.batch.blob_target_size;
685 let mut run_cfg = cfg.clone();
686 run_cfg.batch.batch_builder_max_outstanding_parts = 1 + extra_outstanding_parts;
687
688 let (batch_ids, descriptions): (BTreeSet<_>, Vec<_>) = runs.iter()
689 .map(|(run_id, desc, _, _)| (run_id.0, *desc))
690 .unzip();
691
692 let input = if incremental_enabled {
693 let run_ids = runs.iter()
694 .map(|(run_id, _, _, _)| run_id.1.expect("run_id should be present"))
695 .collect::<BTreeSet<_>>();
696 match batch_ids.iter().exactly_one().ok() {
697 Some(batch_id) => {
698 CompactionInput::PartialBatch(
699 *batch_id,
700 run_ids
701 )
702 }
703 None => input_id_range(batch_ids),
704 }
705 } else {
706 input_id_range(batch_ids)
707 };
708
709 let desc = if incremental_enabled {
710 let desc_lower = descriptions
711 .iter()
712 .map(|desc| desc.lower())
713 .cloned()
714 .reduce(|a, b| a.meet(&b))
715 .unwrap_or_else(|| req.desc.lower().clone());
716
717 let desc_upper = descriptions
718 .iter()
719 .map(|desc| desc.upper())
720 .cloned()
721 .reduce(|a, b| a.join(&b))
722 .unwrap_or_else(|| req.desc.upper().clone());
723
724 Description::new(desc_lower, desc_upper, req.desc.since().clone())
725 } else {
726 req.desc.clone()
727 };
728
729 let runs = runs.iter()
730 .map(|(_, desc, meta, run)| (*desc, *meta, *run))
731 .collect::<Vec<_>>();
732
733 let batch = Self::compact_runs(
734 &run_cfg,
735 &req.shard_id,
736 &desc,
737 runs,
738 Arc::clone(&blob),
739 Arc::clone(&metrics),
740 Arc::clone(&shard_metrics),
741 Arc::clone(&isolated_runtime),
742 write_schemas.clone(),
743 )
744 .await?;
745
746 assert!(
747 (batch.len == 0 && batch.parts.len() == 0) || (batch.len > 0 && batch.parts.len() > 0),
748 "updates={}, parts={}",
749 batch.len,
750 batch.parts.len(),
751 );
752
753 let clock = cfg.now.clone();
755 let active_compaction = if applied < total_chunked_runs - 1 {
756 Some(ActiveCompaction { start_ms: clock() })
757 } else {
758 None
759 };
760
761 let res = CompactRes {
762 output: batch,
763 input,
764 };
765
766 let res = FueledMergeRes {
767 output: res.output,
768 new_active_compaction: active_compaction,
769 input: res.input,
770 };
771
772 yield Ok(res);
773 }
774 }
775 }
776
777 pub async fn compact(
782 cfg: CompactConfig,
783 blob: Arc<dyn Blob>,
784 metrics: Arc<Metrics>,
785 shard_metrics: Arc<ShardMetrics>,
786 isolated_runtime: Arc<IsolatedRuntime>,
787 req: CompactReq<T>,
788 write_schemas: Schemas<K, V>,
789 ) -> Result<CompactRes<T>, anyhow::Error> {
790 let stream = Self::compact_stream(
791 cfg,
792 Arc::clone(&blob),
793 Arc::clone(&metrics),
794 Arc::clone(&shard_metrics),
795 Arc::clone(&isolated_runtime),
796 req.clone(),
797 write_schemas,
798 false,
799 );
800
801 Self::compact_all(stream, req).await
802 }
803
804 fn chunk_runs<'a>(
809 ordered_runs: &'a [(
810 RunLocation,
811 &'a Description<T>,
812 &'a RunMeta,
813 Cow<'a, [RunPart<T>]>,
814 )],
815 cfg: &CompactConfig,
816 metrics: &Metrics,
817 run_reserved_memory_bytes: usize,
818 ) -> Vec<(
819 Vec<(
820 &'a RunLocation,
821 &'a Description<T>,
822 &'a RunMeta,
823 &'a [RunPart<T>],
824 )>,
825 usize,
826 )> {
827 let grouped: BTreeMap<SpineId, Vec<_>> = ordered_runs
829 .iter()
830 .map(|(run_id, desc, meta, parts)| (run_id.0, (run_id, *desc, *meta, &**parts)))
831 .fold(BTreeMap::new(), |mut acc, item| {
832 acc.entry(item.0).or_default().push(item.1);
833 acc
834 });
835
836 let mut grouped = grouped.into_iter().peekable();
837
838 let mut chunks = vec![];
839 let mut current_chunk = vec![];
840 let mut current_chunk_max_memory_usage = 0;
841 let mut mem_violation = false;
842
843 fn max_part_bytes<T>(parts: &[RunPart<T>], cfg: &CompactConfig) -> usize {
844 parts
845 .iter()
846 .map(|p| p.max_part_bytes())
847 .max()
848 .unwrap_or(cfg.batch.blob_target_size)
849 }
850
851 while let Some((_spine_id, runs)) = grouped.next() {
852 let batch_size = runs
853 .iter()
854 .map(|(_, _, _, parts)| max_part_bytes(parts, cfg))
855 .sum::<usize>();
856
857 let num_runs = runs.len();
858
859 let memory_violation_risk =
864 batch_size > run_reserved_memory_bytes && num_runs == 1 && current_chunk.is_empty();
865
866 if mem_violation && num_runs == 1 {
867 current_chunk.extend(runs.clone());
870 current_chunk_max_memory_usage += batch_size;
871 mem_violation = false;
872 continue;
873 } else if current_chunk_max_memory_usage + batch_size <= run_reserved_memory_bytes {
874 current_chunk.extend(runs);
876 current_chunk_max_memory_usage += batch_size;
877 continue;
878 } else if memory_violation_risk {
879 metrics.compaction.memory_violations.inc();
881 mem_violation = true;
882 current_chunk.extend(runs.clone());
883 current_chunk_max_memory_usage += batch_size;
884 continue;
885 }
886
887 if !current_chunk.is_empty() {
889 chunks.push((
890 std::mem::take(&mut current_chunk),
891 current_chunk_max_memory_usage,
892 ));
893 mem_violation = false;
894 current_chunk_max_memory_usage = 0;
895 }
896
897 let mut run_iter = runs.into_iter().peekable();
899 debug_assert!(current_chunk.is_empty());
900 debug_assert_eq!(current_chunk_max_memory_usage, 0);
901
902 while let Some((run_id, desc, meta, parts)) = run_iter.next() {
903 let run_size = max_part_bytes(parts, cfg);
904 current_chunk.push((run_id, desc, meta, parts));
905 current_chunk_max_memory_usage += run_size;
906
907 if let Some((_, _, _, next_parts)) = run_iter.peek() {
908 let next_size = max_part_bytes(next_parts, cfg);
909 if current_chunk_max_memory_usage + next_size > run_reserved_memory_bytes {
910 if current_chunk.len() == 1 {
912 metrics.compaction.memory_violations.inc();
913 continue;
914 }
915 chunks.push((
917 std::mem::take(&mut current_chunk),
918 current_chunk_max_memory_usage,
919 ));
920 current_chunk_max_memory_usage = 0;
921 }
922 }
923 }
924
925 if !current_chunk.is_empty() {
926 chunks.push((
927 std::mem::take(&mut current_chunk),
928 current_chunk_max_memory_usage,
929 ));
930 current_chunk_max_memory_usage = 0;
931 }
932 }
933
934 if !current_chunk.is_empty() {
936 chunks.push((current_chunk, current_chunk_max_memory_usage));
937 }
938
939 chunks
940 }
941
942 async fn flatten_runs<'a>(
944 req: &'a CompactReq<T>,
945 target_order: RunOrder,
946 blob: &'a dyn Blob,
947 metrics: &'a Metrics,
948 ) -> anyhow::Result<
949 Vec<(
950 RunLocation,
951 &'a Description<T>,
952 &'a RunMeta,
953 Cow<'a, [RunPart<T>]>,
954 )>,
955 > {
956 let total_number_of_runs = req
957 .inputs
958 .iter()
959 .map(|x| x.batch.run_splits.len() + 1)
960 .sum::<usize>();
961
962 let batch_runs: Vec<_> = req
963 .inputs
964 .iter()
965 .map(|x| (x.id, &x.batch.desc, x.batch.runs()))
966 .collect();
967
968 let mut ordered_runs = Vec::with_capacity(total_number_of_runs);
969 for (spine_id, desc, runs) in batch_runs {
970 for (meta, run) in runs {
971 let run_id = RunLocation(spine_id, meta.id);
972 let same_order = meta.order.unwrap_or(RunOrder::Codec) == target_order;
973 if same_order {
974 ordered_runs.push((run_id, desc, meta, Cow::Borrowed(run)));
975 } else {
976 for part in run {
986 let mut batch_parts = pin!(part.part_stream(req.shard_id, blob, metrics));
987 while let Some(part) = batch_parts.next().await {
988 ordered_runs.push((
989 run_id,
990 desc,
991 meta,
992 Cow::Owned(vec![RunPart::Single(part?.into_owned())]),
993 ));
994 }
995 }
996 }
997 }
998 }
999
1000 Ok(ordered_runs)
1001 }
1002
1003 pub(crate) async fn compact_runs(
1007 cfg: &CompactConfig,
1008 shard_id: &ShardId,
1009 desc: &Description<T>,
1010 runs: Vec<(&Description<T>, &RunMeta, &[RunPart<T>])>,
1011 blob: Arc<dyn Blob>,
1012 metrics: Arc<Metrics>,
1013 shard_metrics: Arc<ShardMetrics>,
1014 isolated_runtime: Arc<IsolatedRuntime>,
1015 write_schemas: Schemas<K, V>,
1016 ) -> Result<HollowBatch<T>, anyhow::Error> {
1017 let prefetch_budget_bytes = 2 * cfg.batch.blob_target_size;
1025
1026 let mut timings = Timings::default();
1027
1028 let mut batch_cfg = cfg.batch.clone();
1029
1030 batch_cfg.inline_writes_single_max_bytes = 0;
1035
1036 let parts = BatchParts::new_ordered::<D>(
1037 batch_cfg,
1038 cfg.batch.preferred_order,
1039 Arc::clone(&metrics),
1040 Arc::clone(&shard_metrics),
1041 *shard_id,
1042 Arc::clone(&blob),
1043 Arc::clone(&isolated_runtime),
1044 &metrics.compaction.batch,
1045 );
1046 let mut batch = BatchBuilderInternal::<K, V, T, D>::new(
1047 cfg.batch.clone(),
1048 parts,
1049 Arc::clone(&metrics),
1050 write_schemas.clone(),
1051 Arc::clone(&blob),
1052 shard_id.clone(),
1053 cfg.version.clone(),
1054 );
1055
1056 let mut consolidator = Consolidator::new(
1057 format!(
1058 "{}[lower={:?},upper={:?}]",
1059 shard_id,
1060 desc.lower().elements(),
1061 desc.upper().elements()
1062 ),
1063 cfg.fetch_config.clone(),
1064 *shard_id,
1065 StructuredSort::<K, V, T, D>::new(write_schemas.clone()),
1066 blob,
1067 Arc::clone(&metrics),
1068 shard_metrics,
1069 metrics.read.compaction.clone(),
1070 FetchBatchFilter::Compaction {
1071 since: desc.since().clone(),
1072 },
1073 None,
1074 prefetch_budget_bytes,
1075 );
1076
1077 for (desc, meta, parts) in runs {
1078 consolidator.enqueue_run(desc, meta, parts.iter().cloned());
1079 }
1080
1081 let remaining_budget = consolidator.start_prefetches();
1082 if remaining_budget.is_none() {
1083 metrics.compaction.not_all_prefetched.inc();
1084 }
1085
1086 loop {
1087 let mut chunks = vec![];
1088 let mut total_bytes = 0;
1089 while total_bytes < cfg.batch.blob_target_size {
1094 let fetch_start = Instant::now();
1095 let Some(chunk) = consolidator
1096 .next_chunk(
1097 cfg.compaction_yield_after_n_updates,
1098 cfg.batch.blob_target_size - total_bytes,
1099 )
1100 .await?
1101 else {
1102 break;
1103 };
1104 timings.part_fetching += fetch_start.elapsed();
1105 total_bytes += chunk.goodbytes();
1106 chunks.push(chunk);
1107 tokio::task::yield_now().await;
1108 }
1109
1110 let Some(updates) = Part::concat(&chunks).expect("compaction produces well-typed data")
1112 else {
1113 break;
1114 };
1115 batch.flush_part(desc.clone(), updates).await;
1116 }
1117 let mut batch = batch.finish(desc.clone()).await?;
1118
1119 let has_inline_parts = batch.batch.parts.iter().any(|x| x.is_inline());
1125 if has_inline_parts {
1126 error!(%shard_id, ?cfg, "compaction result unexpectedly had inline writes");
1127 let () = batch
1128 .flush_to_blob(
1129 &cfg.batch,
1130 &metrics.compaction.batch,
1131 &isolated_runtime,
1132 &write_schemas,
1133 )
1134 .await;
1135 }
1136
1137 timings.record(&metrics);
1138 Ok(batch.into_hollow_batch())
1139 }
1140
1141 fn validate_req(req: &CompactReq<T>) -> Result<(), anyhow::Error> {
1142 let mut frontier = req.desc.lower();
1143 for input in req.inputs.iter() {
1144 if PartialOrder::less_than(req.desc.since(), input.batch.desc.since()) {
1145 return Err(anyhow!(
1146 "output since {:?} must be at or in advance of input since {:?}",
1147 req.desc.since(),
1148 input.batch.desc.since()
1149 ));
1150 }
1151 if frontier != input.batch.desc.lower() {
1152 return Err(anyhow!(
1153 "invalid merge of non-consecutive batches {:?} vs {:?}",
1154 frontier,
1155 input.batch.desc.lower()
1156 ));
1157 }
1158 frontier = input.batch.desc.upper();
1159 }
1160 if frontier != req.desc.upper() {
1161 return Err(anyhow!(
1162 "invalid merge of non-consecutive batches {:?} vs {:?}",
1163 frontier,
1164 req.desc.upper()
1165 ));
1166 }
1167 Ok(())
1168 }
1169}
1170
1171#[derive(Debug, Default)]
1172struct Timings {
1173 part_fetching: Duration,
1174 heap_population: Duration,
1175}
1176
1177impl Timings {
1178 fn record(self, metrics: &Metrics) {
1179 let Timings {
1181 part_fetching,
1182 heap_population,
1183 } = self;
1184
1185 metrics
1186 .compaction
1187 .steps
1188 .part_fetch_seconds
1189 .inc_by(part_fetching.as_secs_f64());
1190 metrics
1191 .compaction
1192 .steps
1193 .heap_population_seconds
1194 .inc_by(heap_population.as_secs_f64());
1195 }
1196}
1197
1198#[cfg(test)]
1199mod tests {
1200 use mz_dyncfg::ConfigUpdates;
1201 use mz_ore::{assert_contains, assert_err};
1202 use mz_persist_types::codec_impls::StringSchema;
1203 use timely::progress::Antichain;
1204
1205 use crate::PersistLocation;
1206 use crate::batch::BLOB_TARGET_SIZE;
1207 use crate::internal::trace::SpineId;
1208 use crate::tests::{all_ok, expect_fetch_part, new_test_client_cache};
1209
1210 use super::*;
1211
1212 #[mz_persist_proc::test(tokio::test)]
1216 #[cfg_attr(miri, ignore)] async fn regression_minimum_since(dyncfgs: ConfigUpdates) {
1218 let data = vec![
1219 (("0".to_owned(), "zero".to_owned()), 0, 1),
1220 (("0".to_owned(), "zero".to_owned()), 1, -1),
1221 (("1".to_owned(), "one".to_owned()), 1, 1),
1222 ];
1223
1224 let cache = new_test_client_cache(&dyncfgs);
1225 cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1226 let (mut write, _) = cache
1227 .open(PersistLocation::new_in_mem())
1228 .await
1229 .expect("client construction failed")
1230 .expect_open::<String, String, u64, i64>(ShardId::new())
1231 .await;
1232 let b0 = write
1233 .expect_batch(&data[..1], 0, 1)
1234 .await
1235 .into_hollow_batch();
1236 let b1 = write
1237 .expect_batch(&data[1..], 1, 2)
1238 .await
1239 .into_hollow_batch();
1240
1241 let req = CompactReq {
1242 shard_id: write.machine.shard_id(),
1243 desc: Description::new(
1244 b0.desc.lower().clone(),
1245 b1.desc.upper().clone(),
1246 Antichain::from_elem(10u64),
1247 ),
1248 inputs: vec![
1249 IdHollowBatch {
1250 batch: Arc::new(b0),
1251 id: SpineId(0, 1),
1252 },
1253 IdHollowBatch {
1254 batch: Arc::new(b1),
1255 id: SpineId(1, 2),
1256 },
1257 ],
1258 };
1259 let schemas = Schemas {
1260 id: None,
1261 key: Arc::new(StringSchema),
1262 val: Arc::new(StringSchema),
1263 };
1264 let res = Compactor::<String, String, u64, i64>::compact(
1265 CompactConfig::new(&write.cfg, write.shard_id()),
1266 Arc::clone(&write.blob),
1267 Arc::clone(&write.metrics),
1268 write.metrics.shards.shard(&write.machine.shard_id(), ""),
1269 Arc::new(IsolatedRuntime::new_for_tests()),
1270 req.clone(),
1271 schemas.clone(),
1272 )
1273 .await
1274 .expect("compaction failed");
1275
1276 assert_eq!(res.output.desc, req.desc);
1277 assert_eq!(res.output.len, 1);
1278 assert_eq!(res.output.part_count(), 1);
1279 let part = res.output.parts[0].expect_hollow_part();
1280 let (part, updates) = expect_fetch_part(
1281 write.blob.as_ref(),
1282 &part.key.complete(&write.machine.shard_id()),
1283 &write.metrics,
1284 &schemas,
1285 )
1286 .await;
1287 assert_eq!(part.desc, res.output.desc);
1288 assert_eq!(updates, all_ok(&data, 10));
1289 }
1290
1291 #[mz_persist_proc::test(tokio::test)]
1292 #[cfg_attr(miri, ignore)] async fn disable_compaction(dyncfgs: ConfigUpdates) {
1294 let data = [
1295 (("0".to_owned(), "zero".to_owned()), 0, 1),
1296 (("0".to_owned(), "zero".to_owned()), 1, -1),
1297 (("1".to_owned(), "one".to_owned()), 1, 1),
1298 ];
1299
1300 let cache = new_test_client_cache(&dyncfgs);
1301 cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1302 let (mut write, _) = cache
1303 .open(PersistLocation::new_in_mem())
1304 .await
1305 .expect("client construction failed")
1306 .expect_open::<String, String, u64, i64>(ShardId::new())
1307 .await;
1308 let b0 = write
1309 .expect_batch(&data[..1], 0, 1)
1310 .await
1311 .into_hollow_batch();
1312 let b1 = write
1313 .expect_batch(&data[1..], 1, 2)
1314 .await
1315 .into_hollow_batch();
1316
1317 let req = CompactReq {
1318 shard_id: write.machine.shard_id(),
1319 desc: Description::new(
1320 b0.desc.lower().clone(),
1321 b1.desc.upper().clone(),
1322 Antichain::from_elem(10u64),
1323 ),
1324 inputs: vec![
1325 IdHollowBatch {
1326 batch: Arc::new(b0),
1327 id: SpineId(0, 1),
1328 },
1329 IdHollowBatch {
1330 batch: Arc::new(b1),
1331 id: SpineId(1, 2),
1332 },
1333 ],
1334 };
1335 write.cfg.set_config(&COMPACTION_HEURISTIC_MIN_INPUTS, 1);
1336 let compactor = write.compact.as_ref().expect("compaction hard disabled");
1337
1338 write.cfg.disable_compaction();
1339 let result = compactor
1340 .compact_and_apply_background(req.clone(), &write.machine)
1341 .expect("listener")
1342 .await
1343 .expect("channel closed");
1344 assert_err!(result);
1345 assert_contains!(result.unwrap_err().to_string(), "compaction disabled");
1346
1347 write.cfg.enable_compaction();
1348 compactor
1349 .compact_and_apply_background(req, &write.machine)
1350 .expect("listener")
1351 .await
1352 .expect("channel closed")
1353 .expect("compaction success");
1354
1355 let data2 = [
1357 (("2".to_owned(), "two".to_owned()), 2, 1),
1358 (("2".to_owned(), "two".to_owned()), 3, -1),
1359 (("3".to_owned(), "three".to_owned()), 3, 1),
1360 ];
1361
1362 let b2 = write
1363 .expect_batch(&data2[..1], 2, 3)
1364 .await
1365 .into_hollow_batch();
1366 let b3 = write
1367 .expect_batch(&data2[1..], 3, 4)
1368 .await
1369 .into_hollow_batch();
1370
1371 let req = CompactReq {
1372 shard_id: write.machine.shard_id(),
1373 desc: Description::new(
1374 b2.desc.lower().clone(),
1375 b3.desc.upper().clone(),
1376 Antichain::from_elem(20u64),
1377 ),
1378 inputs: vec![
1379 IdHollowBatch {
1380 batch: Arc::new(b2),
1381 id: SpineId(0, 1),
1382 },
1383 IdHollowBatch {
1384 batch: Arc::new(b3),
1385 id: SpineId(1, 2),
1386 },
1387 ],
1388 };
1389 let compactor = write.compact.as_ref().expect("compaction hard disabled");
1390
1391 write.cfg.set_config(&COMPACTION_CHECK_PROCESS_FLAG, false);
1393 write.cfg.disable_compaction();
1394 compactor
1396 .compact_and_apply_background(req, &write.machine)
1397 .expect("listener")
1398 .await
1399 .expect("channel closed")
1400 .expect("compaction success");
1401 }
1402}