1use std::borrow::Cow;
11use std::collections::VecDeque;
12use std::fmt::Debug;
13use std::marker::PhantomData;
14use std::pin::pin;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use anyhow::anyhow;
19use differential_dataflow::difference::Semigroup;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::trace::Description;
22use futures_util::{StreamExt, TryFutureExt};
23use mz_dyncfg::Config;
24use mz_ore::cast::CastFrom;
25use mz_ore::error::ErrorExt;
26use mz_persist::location::Blob;
27use mz_persist_types::part::Part;
28use mz_persist_types::{Codec, Codec64};
29use timely::PartialOrder;
30use timely::progress::{Antichain, Timestamp};
31use tokio::sync::mpsc::Sender;
32use tokio::sync::{TryAcquireError, mpsc, oneshot};
33use tracing::{Instrument, Span, debug, debug_span, error, trace, warn};
34
35use crate::async_runtime::IsolatedRuntime;
36use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, BatchParts, PartDeletes};
37use crate::cfg::{
38 COMPACTION_HEURISTIC_MIN_INPUTS, COMPACTION_HEURISTIC_MIN_PARTS,
39 COMPACTION_HEURISTIC_MIN_UPDATES, COMPACTION_MEMORY_BOUND_BYTES,
40 GC_BLOB_DELETE_CONCURRENCY_LIMIT, MiB,
41};
42use crate::fetch::FetchBatchFilter;
43use crate::internal::encoding::Schemas;
44use crate::internal::gc::GarbageCollector;
45use crate::internal::machine::Machine;
46use crate::internal::maintenance::RoutineMaintenance;
47use crate::internal::metrics::ShardMetrics;
48use crate::internal::state::{HollowBatch, RunMeta, RunOrder, RunPart};
49use crate::internal::trace::{ApplyMergeResult, FueledMergeRes};
50use crate::iter::{Consolidator, StructuredSort};
51use crate::{Metrics, PersistConfig, ShardId};
52
53#[derive(Debug, Clone)]
59pub struct CompactReq<T> {
60 pub shard_id: ShardId,
62 pub desc: Description<T>,
64 pub inputs: Vec<HollowBatch<T>>,
67}
68
69#[derive(Debug)]
71pub struct CompactRes<T> {
72 pub output: HollowBatch<T>,
74}
75
76#[derive(Debug, Clone)]
79pub struct CompactConfig {
80 pub(crate) compaction_memory_bound_bytes: usize,
81 pub(crate) compaction_yield_after_n_updates: usize,
82 pub(crate) version: semver::Version,
83 pub(crate) batch: BatchBuilderConfig,
84}
85
86impl CompactConfig {
87 pub fn new(value: &PersistConfig, shard_id: ShardId) -> Self {
89 CompactConfig {
90 compaction_memory_bound_bytes: COMPACTION_MEMORY_BOUND_BYTES.get(value),
91 compaction_yield_after_n_updates: value.compaction_yield_after_n_updates,
92 version: value.build_version.clone(),
93 batch: BatchBuilderConfig::new(value, shard_id),
94 }
95 }
96}
97
98#[derive(Debug)]
104pub struct Compactor<K, V, T, D> {
105 cfg: PersistConfig,
106 metrics: Arc<Metrics>,
107 sender: Sender<(
108 Instant,
109 CompactReq<T>,
110 Machine<K, V, T, D>,
111 oneshot::Sender<Result<ApplyMergeResult, anyhow::Error>>,
112 )>,
113 _phantom: PhantomData<fn() -> D>,
114}
115
116impl<K, V, T, D> Clone for Compactor<K, V, T, D> {
117 fn clone(&self) -> Self {
118 Compactor {
119 cfg: self.cfg.clone(),
120 metrics: Arc::clone(&self.metrics),
121 sender: self.sender.clone(),
122 _phantom: Default::default(),
123 }
124 }
125}
126
127pub(crate) const COMPACTION_MINIMUM_TIMEOUT: Config<Duration> = Config::new(
131 "persist_compaction_minimum_timeout",
132 Duration::from_secs(90),
133 "\
134 The minimum amount of time to allow a persist compaction request to run \
135 before timing it out (Materialize).",
136);
137
138pub(crate) const COMPACTION_USE_MOST_RECENT_SCHEMA: Config<bool> = Config::new(
139 "persist_compaction_use_most_recent_schema",
140 true,
141 "\
142 Use the most recent schema from all the Runs that are currently being \
143 compacted, instead of the schema on the current write handle (Materialize).
144 ",
145);
146
147pub(crate) const COMPACTION_CHECK_PROCESS_FLAG: Config<bool> = Config::new(
148 "persist_compaction_check_process_flag",
149 true,
150 "Whether Compactor will obey the process_requests flag in PersistConfig, \
151 which allows dynamically disabling compaction. If false, all compaction requests will be processed.",
152);
153
154impl<K, V, T, D> Compactor<K, V, T, D>
155where
156 K: Debug + Codec,
157 V: Debug + Codec,
158 T: Timestamp + Lattice + Codec64 + Sync,
159 D: Semigroup + Ord + Codec64 + Send + Sync,
160{
161 pub fn new(
162 cfg: PersistConfig,
163 metrics: Arc<Metrics>,
164 write_schemas: Schemas<K, V>,
165 gc: GarbageCollector<K, V, T, D>,
166 ) -> Self {
167 let (compact_req_sender, mut compact_req_receiver) = mpsc::channel::<(
168 Instant,
169 CompactReq<T>,
170 Machine<K, V, T, D>,
171 oneshot::Sender<Result<ApplyMergeResult, anyhow::Error>>,
172 )>(cfg.compaction_queue_size);
173 let concurrency_limit = Arc::new(tokio::sync::Semaphore::new(
174 cfg.compaction_concurrency_limit,
175 ));
176 let check_process_requests = COMPACTION_CHECK_PROCESS_FLAG.handle(&cfg.configs);
177 let process_requests = Arc::clone(&cfg.compaction_process_requests);
178
179 let _worker_handle = mz_ore::task::spawn(|| "PersistCompactionScheduler", async move {
182 while let Some((enqueued, req, machine, completer)) = compact_req_receiver.recv().await
183 {
184 assert_eq!(req.shard_id, machine.shard_id());
185 let metrics = Arc::clone(&machine.applier.metrics);
186
187 if check_process_requests.get()
189 && !process_requests.load(std::sync::atomic::Ordering::Relaxed)
190 {
191 let _ = completer.send(Err(anyhow::anyhow!("compaction disabled")));
194 metrics.compaction.disabled.inc();
195 tracing::warn!(shard_id = ?req.shard_id, "Dropping compaction request on the floor.");
196
197 continue;
198 }
199
200 let permit = {
201 let inner = Arc::clone(&concurrency_limit);
202 match inner.try_acquire_owned() {
205 Ok(permit) => permit,
206 Err(TryAcquireError::NoPermits) => {
207 metrics.compaction.concurrency_waits.inc();
208 Arc::clone(&concurrency_limit)
209 .acquire_owned()
210 .await
211 .expect("semaphore is never closed")
212 }
213 Err(TryAcquireError::Closed) => {
214 warn!("semaphore for shard {} is closed", machine.shard_id());
217 continue;
218 }
219 }
220 };
221 metrics
222 .compaction
223 .queued_seconds
224 .inc_by(enqueued.elapsed().as_secs_f64());
225
226 let write_schemas = write_schemas.clone();
227
228 let compact_span =
229 debug_span!(parent: None, "compact::apply", shard_id=%machine.shard_id());
230 compact_span.follows_from(&Span::current());
231 let gc = gc.clone();
232 mz_ore::task::spawn(|| "PersistCompactionWorker", async move {
233 let res = Self::compact_and_apply(&machine, req, write_schemas)
234 .instrument(compact_span)
235 .await;
236 let res = res.map(|(res, maintenance)| {
237 maintenance.start_performing(&machine, &gc);
238 res
239 });
240
241 let _ = completer.send(res);
244
245 drop(permit);
247 });
248 }
249 });
250
251 Compactor {
252 cfg,
253 metrics,
254 sender: compact_req_sender,
255 _phantom: PhantomData,
256 }
257 }
258
259 pub fn compact_and_apply_background(
264 &self,
265 req: CompactReq<T>,
266 machine: &Machine<K, V, T, D>,
267 ) -> Option<oneshot::Receiver<Result<ApplyMergeResult, anyhow::Error>>> {
268 let should_compact = req.inputs.len() >= COMPACTION_HEURISTIC_MIN_INPUTS.get(&self.cfg)
274 || req.inputs.iter().map(|x| x.part_count()).sum::<usize>()
275 >= COMPACTION_HEURISTIC_MIN_PARTS.get(&self.cfg)
276 || req.inputs.iter().map(|x| x.len).sum::<usize>()
277 >= COMPACTION_HEURISTIC_MIN_UPDATES.get(&self.cfg);
278 if !should_compact {
279 self.metrics.compaction.skipped.inc();
280 return None;
281 }
282
283 let (compaction_completed_sender, compaction_completed_receiver) = oneshot::channel();
284 let new_compaction_sender = self.sender.clone();
285
286 self.metrics.compaction.requested.inc();
287 let send = new_compaction_sender.try_send((
293 Instant::now(),
294 req,
295 machine.clone(),
296 compaction_completed_sender,
297 ));
298 if let Err(_) = send {
299 self.metrics.compaction.dropped.inc();
300 return None;
301 }
302
303 Some(compaction_completed_receiver)
304 }
305
306 pub(crate) async fn compact_and_apply(
307 machine: &Machine<K, V, T, D>,
308 req: CompactReq<T>,
309 write_schemas: Schemas<K, V>,
310 ) -> Result<(ApplyMergeResult, RoutineMaintenance), anyhow::Error> {
311 let metrics = Arc::clone(&machine.applier.metrics);
312 metrics.compaction.started.inc();
313 let start = Instant::now();
314
315 let total_input_bytes = req
318 .inputs
319 .iter()
320 .map(|batch| batch.encoded_size_bytes())
321 .sum::<usize>();
322 let timeout = Duration::max(
323 COMPACTION_MINIMUM_TIMEOUT.get(&machine.applier.cfg),
325 Duration::from_secs(u64::cast_from(total_input_bytes / MiB)),
327 );
328 let compaction_schema_id = req
331 .inputs
332 .iter()
333 .flat_map(|batch| batch.run_meta.iter())
334 .filter_map(|run_meta| run_meta.schema)
335 .max();
337 let maybe_compaction_schema = match compaction_schema_id {
338 Some(id) => machine
339 .get_schema(id)
340 .map(|(key_schema, val_schema)| (id, key_schema, val_schema)),
341 None => None,
342 };
343 let use_most_recent_schema = COMPACTION_USE_MOST_RECENT_SCHEMA.get(&machine.applier.cfg);
344
345 let compaction_schema = match maybe_compaction_schema {
346 Some((id, key_schema, val_schema)) if use_most_recent_schema => {
347 metrics.compaction.schema_selection.recent_schema.inc();
348 Schemas {
349 id: Some(id),
350 key: Arc::new(key_schema),
351 val: Arc::new(val_schema),
352 }
353 }
354 Some(_) => {
355 metrics.compaction.schema_selection.disabled.inc();
356 write_schemas
357 }
358 None => {
359 metrics.compaction.schema_selection.no_schema.inc();
360 write_schemas
361 }
362 };
363
364 trace!(
365 "compaction request for {}MBs ({} bytes), with timeout of {}s, and schema {:?}.",
366 total_input_bytes / MiB,
367 total_input_bytes,
368 timeout.as_secs_f64(),
369 compaction_schema.id,
370 );
371
372 let compact_span = debug_span!("compact::consolidate");
373 let res = tokio::time::timeout(
374 timeout,
375 machine
377 .isolated_runtime
378 .spawn_named(
379 || "persist::compact::consolidate",
380 Self::compact(
381 CompactConfig::new(&machine.applier.cfg, machine.shard_id()),
382 Arc::clone(&machine.applier.state_versions.blob),
383 Arc::clone(&metrics),
384 Arc::clone(&machine.applier.shard_metrics),
385 Arc::clone(&machine.isolated_runtime),
386 req,
387 compaction_schema,
388 )
389 .instrument(compact_span),
390 )
391 .map_err(|e| anyhow!(e)),
392 )
393 .await;
394
395 let res = match res {
396 Ok(res) => res,
397 Err(err) => {
398 metrics.compaction.timed_out.inc();
399 Err(anyhow!(err))
400 }
401 };
402
403 metrics
404 .compaction
405 .seconds
406 .inc_by(start.elapsed().as_secs_f64());
407
408 match res {
409 Ok(Ok(res)) => {
410 let res = FueledMergeRes { output: res.output };
411 let (apply_merge_result, maintenance) = machine.merge_res(&res).await;
412 match &apply_merge_result {
413 ApplyMergeResult::AppliedExact => {
414 metrics.compaction.applied.inc();
415 metrics.compaction.applied_exact_match.inc();
416 machine.applier.shard_metrics.compaction_applied.inc();
417 Ok((apply_merge_result, maintenance))
418 }
419 ApplyMergeResult::AppliedSubset => {
420 metrics.compaction.applied.inc();
421 metrics.compaction.applied_subset_match.inc();
422 machine.applier.shard_metrics.compaction_applied.inc();
423 Ok((apply_merge_result, maintenance))
424 }
425 ApplyMergeResult::NotAppliedNoMatch
426 | ApplyMergeResult::NotAppliedInvalidSince
427 | ApplyMergeResult::NotAppliedTooManyUpdates => {
428 if let ApplyMergeResult::NotAppliedTooManyUpdates = &apply_merge_result {
429 metrics.compaction.not_applied_too_many_updates.inc();
430 }
431 metrics.compaction.noop.inc();
432 let mut part_deletes = PartDeletes::default();
433 for part in res.output.parts {
434 part_deletes.add(&part);
435 }
436 let () = part_deletes
437 .delete(
438 machine.applier.state_versions.blob.as_ref(),
439 machine.shard_id(),
440 GC_BLOB_DELETE_CONCURRENCY_LIMIT.get(&machine.applier.cfg),
441 &*metrics,
442 &metrics.retries.external.compaction_noop_delete,
443 )
444 .await;
445 Ok((apply_merge_result, maintenance))
446 }
447 }
448 }
449 Ok(Err(err)) | Err(err) => {
450 metrics.compaction.failed.inc();
451 debug!(
452 "compaction for {} failed: {}",
453 machine.shard_id(),
454 err.display_with_causes()
455 );
456 Err(err)
457 }
458 }
459 }
460
461 pub async fn compact(
485 cfg: CompactConfig,
486 blob: Arc<dyn Blob>,
487 metrics: Arc<Metrics>,
488 shard_metrics: Arc<ShardMetrics>,
489 isolated_runtime: Arc<IsolatedRuntime>,
490 req: CompactReq<T>,
491 write_schemas: Schemas<K, V>,
492 ) -> Result<CompactRes<T>, anyhow::Error> {
493 let () = Self::validate_req(&req)?;
494
495 let mut single_nonempty_batch = None;
500 for batch in &req.inputs {
501 if batch.len > 0 {
502 match single_nonempty_batch {
503 None => single_nonempty_batch = Some(batch),
504 Some(_previous_nonempty_batch) => {
505 single_nonempty_batch = None;
506 break;
507 }
508 }
509 }
510 }
511 if let Some(single_nonempty_batch) = single_nonempty_batch {
512 if single_nonempty_batch.run_splits.len() == 0
513 && single_nonempty_batch.desc.since() != &Antichain::from_elem(T::minimum())
514 {
515 metrics.compaction.fast_path_eligible.inc();
516 }
517 }
518
519 assert!(cfg.compaction_memory_bound_bytes >= 4 * cfg.batch.blob_target_size);
521 let in_progress_part_reserved_memory_bytes = 2 * cfg.batch.blob_target_size;
523 let run_reserved_memory_bytes =
525 cfg.compaction_memory_bound_bytes - in_progress_part_reserved_memory_bytes;
526
527 let mut all_parts = vec![];
528 let mut all_run_splits = vec![];
529 let mut all_run_meta = vec![];
530 let mut len = 0;
531
532 let ordered_runs =
533 Self::order_runs(&req, cfg.batch.preferred_order, &*blob, &*metrics).await?;
534 for (runs, run_chunk_max_memory_usage) in
535 Self::chunk_runs(&ordered_runs, &cfg, &*metrics, run_reserved_memory_bytes)
536 {
537 metrics.compaction.chunks_compacted.inc();
538 metrics
539 .compaction
540 .runs_compacted
541 .inc_by(u64::cast_from(runs.len()));
542
543 let extra_outstanding_parts = (run_reserved_memory_bytes
547 .saturating_sub(run_chunk_max_memory_usage))
548 / cfg.batch.blob_target_size;
549 let mut run_cfg = cfg.clone();
550 run_cfg.batch.batch_builder_max_outstanding_parts = 1 + extra_outstanding_parts;
551 let batch = Self::compact_runs(
552 &run_cfg,
553 &req.shard_id,
554 &req.desc,
555 runs,
556 Arc::clone(&blob),
557 Arc::clone(&metrics),
558 Arc::clone(&shard_metrics),
559 Arc::clone(&isolated_runtime),
560 write_schemas.clone(),
561 )
562 .await?;
563 let (parts, run_splits, run_meta, updates) =
564 (batch.parts, batch.run_splits, batch.run_meta, batch.len);
565 assert!(
566 (updates == 0 && parts.len() == 0) || (updates > 0 && parts.len() > 0),
567 "updates={}, parts={}",
568 updates,
569 parts.len(),
570 );
571
572 if updates == 0 {
573 continue;
574 }
575 let run_offset = all_parts.len();
591 if all_parts.len() > 0 {
592 all_run_splits.push(run_offset);
593 }
594 all_run_splits.extend(run_splits.iter().map(|run_start| run_start + run_offset));
595 all_run_meta.extend(run_meta);
596 all_parts.extend(parts);
597 len += updates;
598 }
599
600 Ok(CompactRes {
601 output: HollowBatch::new(
602 req.desc.clone(),
603 all_parts,
604 len,
605 all_run_meta,
606 all_run_splits,
607 ),
608 })
609 }
610
611 fn chunk_runs<'a>(
616 ordered_runs: &'a [(&'a Description<T>, &'a RunMeta, Cow<'a, [RunPart<T>]>)],
617 cfg: &CompactConfig,
618 metrics: &Metrics,
619 run_reserved_memory_bytes: usize,
620 ) -> Vec<(
621 Vec<(&'a Description<T>, &'a RunMeta, &'a [RunPart<T>])>,
622 usize,
623 )> {
624 let mut ordered_runs = ordered_runs.into_iter().peekable();
625
626 let mut chunks = vec![];
627 let mut current_chunk = vec![];
628 let mut current_chunk_max_memory_usage = 0;
629 while let Some((desc, meta, run)) = ordered_runs.next() {
630 let run_greatest_part_size = run
631 .iter()
632 .map(|x| x.max_part_bytes())
633 .max()
634 .unwrap_or(cfg.batch.blob_target_size);
635 current_chunk.push((*desc, *meta, &**run));
636 current_chunk_max_memory_usage += run_greatest_part_size;
637
638 if let Some((_next_desc, _next_meta, next_run)) = ordered_runs.peek() {
639 let next_run_greatest_part_size = next_run
640 .iter()
641 .map(|x| x.max_part_bytes())
642 .max()
643 .unwrap_or(cfg.batch.blob_target_size);
644
645 if current_chunk_max_memory_usage + next_run_greatest_part_size
647 <= run_reserved_memory_bytes
648 {
649 continue;
650 }
651
652 if current_chunk.len() == 1 {
658 metrics.compaction.memory_violations.inc();
662 continue;
663 }
664 }
665
666 chunks.push((
667 std::mem::take(&mut current_chunk),
668 current_chunk_max_memory_usage,
669 ));
670 current_chunk_max_memory_usage = 0;
671 }
672
673 chunks
674 }
675
676 async fn order_runs<'a>(
693 req: &'a CompactReq<T>,
694 target_order: RunOrder,
695 blob: &'a dyn Blob,
696 metrics: &'a Metrics,
697 ) -> anyhow::Result<Vec<(&'a Description<T>, &'a RunMeta, Cow<'a, [RunPart<T>]>)>> {
698 let total_number_of_runs = req
699 .inputs
700 .iter()
701 .map(|x| x.run_splits.len() + 1)
702 .sum::<usize>();
703
704 let mut batch_runs: VecDeque<_> = req
705 .inputs
706 .iter()
707 .map(|batch| (&batch.desc, batch.runs()))
708 .collect();
709
710 let mut ordered_runs = Vec::with_capacity(total_number_of_runs);
711
712 while let Some((desc, mut runs)) = batch_runs.pop_front() {
713 if let Some((meta, run)) = runs.next() {
714 let same_order = meta.order.unwrap_or(RunOrder::Codec) == target_order;
715 if same_order {
716 ordered_runs.push((desc, meta, Cow::Borrowed(run)));
717 } else {
718 for part in run {
728 let mut batch_parts = pin!(part.part_stream(req.shard_id, blob, metrics));
729 while let Some(part) = batch_parts.next().await {
730 ordered_runs.push((
731 desc,
732 meta,
733 Cow::Owned(vec![RunPart::Single(part?.into_owned())]),
734 ));
735 }
736 }
737 }
738 batch_runs.push_back((desc, runs));
739 }
740 }
741
742 Ok(ordered_runs)
743 }
744
745 pub(crate) async fn compact_runs(
749 cfg: &CompactConfig,
750 shard_id: &ShardId,
751 desc: &Description<T>,
752 runs: Vec<(&Description<T>, &RunMeta, &[RunPart<T>])>,
753 blob: Arc<dyn Blob>,
754 metrics: Arc<Metrics>,
755 shard_metrics: Arc<ShardMetrics>,
756 isolated_runtime: Arc<IsolatedRuntime>,
757 write_schemas: Schemas<K, V>,
758 ) -> Result<HollowBatch<T>, anyhow::Error> {
759 let prefetch_budget_bytes = 2 * cfg.batch.blob_target_size;
767
768 let mut timings = Timings::default();
769
770 let mut batch_cfg = cfg.batch.clone();
771
772 batch_cfg.inline_writes_single_max_bytes = 0;
777
778 let parts = BatchParts::new_ordered(
779 batch_cfg,
780 cfg.batch.preferred_order,
781 Arc::clone(&metrics),
782 Arc::clone(&shard_metrics),
783 *shard_id,
784 Arc::clone(&blob),
785 Arc::clone(&isolated_runtime),
786 &metrics.compaction.batch,
787 );
788 let mut batch = BatchBuilderInternal::<K, V, T, D>::new(
789 cfg.batch.clone(),
790 parts,
791 Arc::clone(&metrics),
792 write_schemas.clone(),
793 Arc::clone(&blob),
794 shard_id.clone(),
795 cfg.version.clone(),
796 );
797
798 let mut consolidator = Consolidator::new(
799 format!(
800 "{}[lower={:?},upper={:?}]",
801 shard_id,
802 desc.lower().elements(),
803 desc.upper().elements()
804 ),
805 *shard_id,
806 StructuredSort::<K, V, T, D>::new(write_schemas.clone()),
807 blob,
808 Arc::clone(&metrics),
809 shard_metrics,
810 metrics.read.compaction.clone(),
811 FetchBatchFilter::Compaction {
812 since: desc.since().clone(),
813 },
814 prefetch_budget_bytes,
815 );
816
817 for (desc, meta, parts) in runs {
818 consolidator.enqueue_run(desc, meta, parts.iter().cloned());
819 }
820
821 let remaining_budget = consolidator.start_prefetches();
822 if remaining_budget.is_none() {
823 metrics.compaction.not_all_prefetched.inc();
824 }
825
826 loop {
827 let mut chunks = vec![];
828 let mut total_bytes = 0;
829 while total_bytes < cfg.batch.blob_target_size {
834 let fetch_start = Instant::now();
835 let Some(chunk) = consolidator
836 .next_chunk(
837 cfg.compaction_yield_after_n_updates,
838 cfg.batch.blob_target_size - total_bytes,
839 )
840 .await?
841 else {
842 break;
843 };
844 timings.part_fetching += fetch_start.elapsed();
845 total_bytes += chunk.goodbytes();
846 chunks.push(chunk);
847 tokio::task::yield_now().await;
848 }
849
850 let Some(updates) = Part::concat(&chunks).expect("compaction produces well-typed data")
852 else {
853 break;
854 };
855 batch.flush_part(desc.clone(), updates).await;
856 }
857 let mut batch = batch.finish(desc.clone()).await?;
858
859 let has_inline_parts = batch.batch.parts.iter().any(|x| x.is_inline());
865 if has_inline_parts {
866 error!(%shard_id, ?cfg, "compaction result unexpectedly had inline writes");
867 let () = batch
868 .flush_to_blob(
869 &cfg.batch,
870 &metrics.compaction.batch,
871 &isolated_runtime,
872 &write_schemas,
873 )
874 .await;
875 }
876
877 timings.record(&metrics);
878 Ok(batch.into_hollow_batch())
879 }
880
881 fn validate_req(req: &CompactReq<T>) -> Result<(), anyhow::Error> {
882 let mut frontier = req.desc.lower();
883 for input in req.inputs.iter() {
884 if PartialOrder::less_than(req.desc.since(), input.desc.since()) {
885 return Err(anyhow!(
886 "output since {:?} must be at or in advance of input since {:?}",
887 req.desc.since(),
888 input.desc.since()
889 ));
890 }
891 if frontier != input.desc.lower() {
892 return Err(anyhow!(
893 "invalid merge of non-consecutive batches {:?} vs {:?}",
894 frontier,
895 input.desc.lower()
896 ));
897 }
898 frontier = input.desc.upper();
899 }
900 if frontier != req.desc.upper() {
901 return Err(anyhow!(
902 "invalid merge of non-consecutive batches {:?} vs {:?}",
903 frontier,
904 req.desc.upper()
905 ));
906 }
907 Ok(())
908 }
909}
910
911#[derive(Debug, Default)]
912struct Timings {
913 part_fetching: Duration,
914 heap_population: Duration,
915}
916
917impl Timings {
918 fn record(self, metrics: &Metrics) {
919 let Timings {
921 part_fetching,
922 heap_population,
923 } = self;
924
925 metrics
926 .compaction
927 .steps
928 .part_fetch_seconds
929 .inc_by(part_fetching.as_secs_f64());
930 metrics
931 .compaction
932 .steps
933 .heap_population_seconds
934 .inc_by(heap_population.as_secs_f64());
935 }
936}
937
938#[cfg(test)]
939mod tests {
940 use mz_dyncfg::ConfigUpdates;
941 use mz_ore::{assert_contains, assert_err};
942 use mz_persist_types::codec_impls::StringSchema;
943 use timely::order::Product;
944 use timely::progress::Antichain;
945
946 use crate::PersistLocation;
947 use crate::batch::BLOB_TARGET_SIZE;
948 use crate::tests::{all_ok, expect_fetch_part, new_test_client_cache};
949
950 use super::*;
951
952 #[mz_persist_proc::test(tokio::test)]
956 #[cfg_attr(miri, ignore)] async fn regression_minimum_since(dyncfgs: ConfigUpdates) {
958 let data = vec![
959 (("0".to_owned(), "zero".to_owned()), 0, 1),
960 (("0".to_owned(), "zero".to_owned()), 1, -1),
961 (("1".to_owned(), "one".to_owned()), 1, 1),
962 ];
963
964 let cache = new_test_client_cache(&dyncfgs);
965 cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
966 let (mut write, _) = cache
967 .open(PersistLocation::new_in_mem())
968 .await
969 .expect("client construction failed")
970 .expect_open::<String, String, u64, i64>(ShardId::new())
971 .await;
972 let b0 = write
973 .expect_batch(&data[..1], 0, 1)
974 .await
975 .into_hollow_batch();
976 let b1 = write
977 .expect_batch(&data[1..], 1, 2)
978 .await
979 .into_hollow_batch();
980
981 let req = CompactReq {
982 shard_id: write.machine.shard_id(),
983 desc: Description::new(
984 b0.desc.lower().clone(),
985 b1.desc.upper().clone(),
986 Antichain::from_elem(10u64),
987 ),
988 inputs: vec![b0, b1],
989 };
990 let schemas = Schemas {
991 id: None,
992 key: Arc::new(StringSchema),
993 val: Arc::new(StringSchema),
994 };
995 let res = Compactor::<String, String, u64, i64>::compact(
996 CompactConfig::new(&write.cfg, write.shard_id()),
997 Arc::clone(&write.blob),
998 Arc::clone(&write.metrics),
999 write.metrics.shards.shard(&write.machine.shard_id(), ""),
1000 Arc::new(IsolatedRuntime::default()),
1001 req.clone(),
1002 schemas.clone(),
1003 )
1004 .await
1005 .expect("compaction failed");
1006
1007 assert_eq!(res.output.desc, req.desc);
1008 assert_eq!(res.output.len, 1);
1009 assert_eq!(res.output.part_count(), 1);
1010 let part = res.output.parts[0].expect_hollow_part();
1011 let (part, updates) = expect_fetch_part(
1012 write.blob.as_ref(),
1013 &part.key.complete(&write.machine.shard_id()),
1014 &write.metrics,
1015 &schemas,
1016 )
1017 .await;
1018 assert_eq!(part.desc, res.output.desc);
1019 assert_eq!(updates, all_ok(&data, 10));
1020 }
1021
1022 #[mz_persist_proc::test(tokio::test)]
1023 #[cfg_attr(miri, ignore)] async fn compaction_partial_order(dyncfgs: ConfigUpdates) {
1025 let data = vec![
1026 (("0".to_owned(), "zero".to_owned()), Product::new(0, 10), 1),
1027 (("1".to_owned(), "one".to_owned()), Product::new(10, 0), 1),
1028 ];
1029
1030 let cache = new_test_client_cache(&dyncfgs);
1031 cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1032 let (mut write, _) = cache
1033 .open(PersistLocation::new_in_mem())
1034 .await
1035 .expect("client construction failed")
1036 .expect_open::<String, String, Product<u32, u32>, i64>(ShardId::new())
1037 .await;
1038 let b0 = write
1039 .batch(
1040 &data[..1],
1041 Antichain::from_elem(Product::new(0, 0)),
1042 Antichain::from_iter([Product::new(0, 11), Product::new(10, 0)]),
1043 )
1044 .await
1045 .expect("invalid usage")
1046 .into_hollow_batch();
1047
1048 let b1 = write
1049 .batch(
1050 &data[1..],
1051 Antichain::from_iter([Product::new(0, 11), Product::new(10, 0)]),
1052 Antichain::from_elem(Product::new(10, 1)),
1053 )
1054 .await
1055 .expect("invalid usage")
1056 .into_hollow_batch();
1057
1058 let req = CompactReq {
1059 shard_id: write.machine.shard_id(),
1060 desc: Description::new(
1061 b0.desc.lower().clone(),
1062 b1.desc.upper().clone(),
1063 Antichain::from_elem(Product::new(10, 0)),
1064 ),
1065 inputs: vec![b0, b1],
1066 };
1067 let schemas = Schemas {
1068 id: None,
1069 key: Arc::new(StringSchema),
1070 val: Arc::new(StringSchema),
1071 };
1072 let res = Compactor::<String, String, Product<u32, u32>, i64>::compact(
1073 CompactConfig::new(&write.cfg, write.shard_id()),
1074 Arc::clone(&write.blob),
1075 Arc::clone(&write.metrics),
1076 write.metrics.shards.shard(&write.machine.shard_id(), ""),
1077 Arc::new(IsolatedRuntime::default()),
1078 req.clone(),
1079 schemas.clone(),
1080 )
1081 .await
1082 .expect("compaction failed");
1083
1084 assert_eq!(res.output.desc, req.desc);
1085 assert_eq!(res.output.len, 2);
1086 assert_eq!(res.output.part_count(), 1);
1087 let part = res.output.parts[0].expect_hollow_part();
1088 let (part, updates) = expect_fetch_part(
1089 write.blob.as_ref(),
1090 &part.key.complete(&write.machine.shard_id()),
1091 &write.metrics,
1092 &schemas,
1093 )
1094 .await;
1095 assert_eq!(part.desc, res.output.desc);
1096 assert_eq!(updates, all_ok(&data, Product::new(10, 0)));
1097 }
1098
1099 #[mz_persist_proc::test(tokio::test)]
1100 #[cfg_attr(miri, ignore)] async fn disable_compaction(dyncfgs: ConfigUpdates) {
1102 let data = [
1103 (("0".to_owned(), "zero".to_owned()), 0, 1),
1104 (("0".to_owned(), "zero".to_owned()), 1, -1),
1105 (("1".to_owned(), "one".to_owned()), 1, 1),
1106 ];
1107
1108 let cache = new_test_client_cache(&dyncfgs);
1109 cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
1110 let (mut write, _) = cache
1111 .open(PersistLocation::new_in_mem())
1112 .await
1113 .expect("client construction failed")
1114 .expect_open::<String, String, u64, i64>(ShardId::new())
1115 .await;
1116 let b0 = write
1117 .expect_batch(&data[..1], 0, 1)
1118 .await
1119 .into_hollow_batch();
1120 let b1 = write
1121 .expect_batch(&data[1..], 1, 2)
1122 .await
1123 .into_hollow_batch();
1124
1125 let req = CompactReq {
1126 shard_id: write.machine.shard_id(),
1127 desc: Description::new(
1128 b0.desc.lower().clone(),
1129 b1.desc.upper().clone(),
1130 Antichain::from_elem(10u64),
1131 ),
1132 inputs: vec![b0, b1],
1133 };
1134 write.cfg.set_config(&COMPACTION_HEURISTIC_MIN_INPUTS, 1);
1135 let compactor = write.compact.as_ref().expect("compaction hard disabled");
1136
1137 write.cfg.disable_compaction();
1138 let result = compactor
1139 .compact_and_apply_background(req.clone(), &write.machine)
1140 .expect("listener")
1141 .await
1142 .expect("channel closed");
1143 assert_err!(result);
1144 assert_contains!(result.unwrap_err().to_string(), "compaction disabled");
1145
1146 write.cfg.enable_compaction();
1147 compactor
1148 .compact_and_apply_background(req, &write.machine)
1149 .expect("listener")
1150 .await
1151 .expect("channel closed")
1152 .expect("compaction success");
1153
1154 let data2 = [
1156 (("2".to_owned(), "two".to_owned()), 2, 1),
1157 (("2".to_owned(), "two".to_owned()), 3, -1),
1158 (("3".to_owned(), "three".to_owned()), 3, 1),
1159 ];
1160
1161 let b2 = write
1162 .expect_batch(&data2[..1], 2, 3)
1163 .await
1164 .into_hollow_batch();
1165 let b3 = write
1166 .expect_batch(&data2[1..], 3, 4)
1167 .await
1168 .into_hollow_batch();
1169
1170 let req = CompactReq {
1171 shard_id: write.machine.shard_id(),
1172 desc: Description::new(
1173 b2.desc.lower().clone(),
1174 b3.desc.upper().clone(),
1175 Antichain::from_elem(20u64),
1176 ),
1177 inputs: vec![b2, b3],
1178 };
1179 let compactor = write.compact.as_ref().expect("compaction hard disabled");
1180
1181 write.cfg.set_config(&COMPACTION_CHECK_PROCESS_FLAG, false);
1183 write.cfg.disable_compaction();
1184 compactor
1186 .compact_and_apply_background(req, &write.machine)
1187 .expect("listener")
1188 .await
1189 .expect("channel closed")
1190 .expect("compaction success");
1191 }
1192}