1use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::fmt::Debug;
13use std::sync::Arc;
14
15use bytes::{Bytes, BytesMut};
16use differential_dataflow::lattice::Lattice;
17use differential_dataflow::trace::Description;
18use mz_ore::assert_none;
19use mz_ore::cast::CastFrom;
20use mz_persist::location::{SeqNo, VersionedData};
21use mz_persist_types::Codec64;
22use mz_persist_types::schema::SchemaId;
23use mz_proto::TryFromProtoError;
24use timely::PartialOrder;
25use timely::progress::{Antichain, Timestamp};
26use tracing::debug;
27
28use crate::critical::CriticalReaderId;
29use crate::internal::paths::PartialRollupKey;
30use crate::internal::state::{
31 CriticalReaderState, EncodedSchemas, HollowBatch, HollowBlobRef, HollowRollup,
32 LeasedReaderState, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs, RunPart,
33 State, StateCollections, WriterState,
34};
35use crate::internal::trace::CompactionInput;
36use crate::internal::trace::{FueledMergeRes, SpineId, ThinMerge, ThinSpineBatch, Trace};
37use crate::read::LeasedReaderId;
38use crate::write::WriterId;
39use crate::{Metrics, PersistConfig, ShardId};
40
41use StateFieldValDiff::*;
42
43use super::state::{ActiveGc, ActiveRollup};
44
45#[derive(Clone, Debug)]
46#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
47pub enum StateFieldValDiff<V> {
48 Insert(V),
49 Update(V, V),
50 Delete(V),
51}
52
53#[derive(Clone)]
54#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
55pub struct StateFieldDiff<K, V> {
56 pub key: K,
57 pub val: StateFieldValDiff<V>,
58}
59
60impl<K: Debug, V: Debug> std::fmt::Debug for StateFieldDiff<K, V> {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 f.debug_struct("StateFieldDiff")
63 .field("val", &self.val)
66 .field("key", &self.key)
67 .finish()
68 }
69}
70
71#[derive(Debug)]
72#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
73pub struct StateDiff<T> {
74 pub(crate) applier_version: semver::Version,
75 pub(crate) seqno_from: SeqNo,
76 pub(crate) seqno_to: SeqNo,
77 pub(crate) walltime_ms: u64,
78 pub(crate) latest_rollup_key: PartialRollupKey,
79 pub(crate) rollups: Vec<StateFieldDiff<SeqNo, HollowRollup>>,
80 pub(crate) active_rollup: Vec<StateFieldDiff<(), ActiveRollup>>,
81 pub(crate) active_gc: Vec<StateFieldDiff<(), ActiveGc>>,
82 pub(crate) hostname: Vec<StateFieldDiff<(), String>>,
83 pub(crate) last_gc_req: Vec<StateFieldDiff<(), SeqNo>>,
84 pub(crate) leased_readers: Vec<StateFieldDiff<LeasedReaderId, LeasedReaderState<T>>>,
85 pub(crate) critical_readers: Vec<StateFieldDiff<CriticalReaderId, CriticalReaderState<T>>>,
86 pub(crate) writers: Vec<StateFieldDiff<WriterId, WriterState<T>>>,
87 pub(crate) schemas: Vec<StateFieldDiff<SchemaId, EncodedSchemas>>,
88 pub(crate) since: Vec<StateFieldDiff<(), Antichain<T>>>,
89 pub(crate) legacy_batches: Vec<StateFieldDiff<HollowBatch<T>, ()>>,
90 pub(crate) hollow_batches: Vec<StateFieldDiff<SpineId, Arc<HollowBatch<T>>>>,
91 pub(crate) spine_batches: Vec<StateFieldDiff<SpineId, ThinSpineBatch<T>>>,
92 pub(crate) merges: Vec<StateFieldDiff<SpineId, ThinMerge<T>>>,
93}
94
95impl<T: Timestamp + Codec64> StateDiff<T> {
96 pub fn new(
97 applier_version: semver::Version,
98 seqno_from: SeqNo,
99 seqno_to: SeqNo,
100 walltime_ms: u64,
101 latest_rollup_key: PartialRollupKey,
102 ) -> Self {
103 StateDiff {
104 applier_version,
105 seqno_from,
106 seqno_to,
107 walltime_ms,
108 latest_rollup_key,
109 rollups: Vec::default(),
110 active_rollup: Vec::default(),
111 active_gc: Vec::default(),
112 hostname: Vec::default(),
113 last_gc_req: Vec::default(),
114 leased_readers: Vec::default(),
115 critical_readers: Vec::default(),
116 writers: Vec::default(),
117 schemas: Vec::default(),
118 since: Vec::default(),
119 legacy_batches: Vec::default(),
120 hollow_batches: Vec::default(),
121 spine_batches: Vec::default(),
122 merges: Vec::default(),
123 }
124 }
125
126 pub fn referenced_batches(&self) -> impl Iterator<Item = StateFieldValDiff<&HollowBatch<T>>> {
127 let legacy_batches = self
128 .legacy_batches
129 .iter()
130 .filter_map(|diff| match diff.val {
131 Insert(()) => Some(Insert(&diff.key)),
132 Update((), ()) => None, Delete(()) => Some(Delete(&diff.key)),
134 });
135 let hollow_batches = self.hollow_batches.iter().map(|diff| match &diff.val {
136 Insert(batch) => Insert(&**batch),
137 Update(before, after) => Update(&**before, &**after),
138 Delete(batch) => Delete(&**batch),
139 });
140 legacy_batches.chain(hollow_batches)
141 }
142}
143
144impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
145 pub fn from_diff(from: &State<T>, to: &State<T>) -> Self {
146 let State {
149 shard_id: from_shard_id,
150 seqno: from_seqno,
151 hostname: from_hostname,
152 walltime_ms: _, collections:
154 StateCollections {
155 version: _,
156 last_gc_req: from_last_gc_req,
157 rollups: from_rollups,
158 active_rollup: from_active_rollup,
159 active_gc: from_active_gc,
160 leased_readers: from_leased_readers,
161 critical_readers: from_critical_readers,
162 writers: from_writers,
163 schemas: from_schemas,
164 trace: from_trace,
165 },
166 } = from;
167 let State {
168 shard_id: to_shard_id,
169 seqno: to_seqno,
170 walltime_ms: to_walltime_ms,
171 hostname: to_hostname,
172 collections:
173 StateCollections {
174 version: to_applier_version,
175 last_gc_req: to_last_gc_req,
176 rollups: to_rollups,
177 active_rollup: to_active_rollup,
178 active_gc: to_active_gc,
179 leased_readers: to_leased_readers,
180 critical_readers: to_critical_readers,
181 writers: to_writers,
182 schemas: to_schemas,
183 trace: to_trace,
184 },
185 } = to;
186 assert_eq!(from_shard_id, to_shard_id);
187
188 let (_, latest_rollup) = to.latest_rollup();
189 let mut diffs = Self::new(
190 to_applier_version.clone(),
191 *from_seqno,
192 *to_seqno,
193 *to_walltime_ms,
194 latest_rollup.key.clone(),
195 );
196 diff_field_single(from_hostname, to_hostname, &mut diffs.hostname);
197 diff_field_single(from_last_gc_req, to_last_gc_req, &mut diffs.last_gc_req);
198 diff_field_sorted_iter(
199 from_active_rollup.iter().map(|r| (&(), r)),
200 to_active_rollup.iter().map(|r| (&(), r)),
201 &mut diffs.active_rollup,
202 );
203 diff_field_sorted_iter(
204 from_active_gc.iter().map(|g| (&(), g)),
205 to_active_gc.iter().map(|g| (&(), g)),
206 &mut diffs.active_gc,
207 );
208 diff_field_sorted_iter(from_rollups.iter(), to_rollups, &mut diffs.rollups);
209 diff_field_sorted_iter(
210 from_leased_readers.iter(),
211 to_leased_readers,
212 &mut diffs.leased_readers,
213 );
214 diff_field_sorted_iter(
215 from_critical_readers.iter(),
216 to_critical_readers,
217 &mut diffs.critical_readers,
218 );
219 diff_field_sorted_iter(from_writers.iter(), to_writers, &mut diffs.writers);
220 diff_field_sorted_iter(from_schemas.iter(), to_schemas, &mut diffs.schemas);
221 diff_field_single(from_trace.since(), to_trace.since(), &mut diffs.since);
222
223 let from_flat = from_trace.flatten();
224 let to_flat = to_trace.flatten();
225 diff_field_sorted_iter(
226 from_flat.legacy_batches.iter().map(|(k, v)| (&**k, v)),
227 to_flat.legacy_batches.iter().map(|(k, v)| (&**k, v)),
228 &mut diffs.legacy_batches,
229 );
230 diff_field_sorted_iter(
231 from_flat.hollow_batches.iter(),
232 to_flat.hollow_batches.iter(),
233 &mut diffs.hollow_batches,
234 );
235 diff_field_sorted_iter(
236 from_flat.spine_batches.iter(),
237 to_flat.spine_batches.iter(),
238 &mut diffs.spine_batches,
239 );
240 diff_field_sorted_iter(
241 from_flat.merges.iter(),
242 to_flat.merges.iter(),
243 &mut diffs.merges,
244 );
245 diffs
246 }
247
248 pub(crate) fn blob_inserts(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
249 let batches = self
250 .referenced_batches()
251 .filter_map(|spine_diff| match spine_diff {
252 Insert(b) | Update(_, b) => Some(HollowBlobRef::Batch(b)),
253 Delete(_) => None, });
255 let rollups = self
256 .rollups
257 .iter()
258 .filter_map(|rollups_diff| match &rollups_diff.val {
259 StateFieldValDiff::Insert(x) | StateFieldValDiff::Update(_, x) => {
260 Some(HollowBlobRef::Rollup(x))
261 }
262 StateFieldValDiff::Delete(_) => None, });
264 batches.chain(rollups)
265 }
266
267 pub(crate) fn part_deletes(&self) -> impl Iterator<Item = &RunPart<T>> {
268 let removed = self
274 .referenced_batches()
275 .filter_map(|spine_diff| match spine_diff {
276 Insert(_) => None,
277 Update(a, _) | Delete(a) => Some(a.parts.iter().collect::<Vec<_>>()),
278 });
279
280 let added: std::collections::BTreeSet<_> = self
281 .referenced_batches()
282 .filter_map(|spine_diff| match spine_diff {
283 Insert(a) | Update(_, a) => Some(a.parts.iter().collect::<Vec<_>>()),
284 Delete(_) => None,
285 })
286 .flatten()
287 .collect();
288
289 removed
290 .into_iter()
291 .flat_map(|x| x)
292 .filter(move |part| !added.contains(part))
293 }
294
295 pub(crate) fn rollup_deletes(&self) -> impl Iterator<Item = &HollowRollup> {
296 self.rollups
297 .iter()
298 .filter_map(|rollups_diff| match &rollups_diff.val {
299 Insert(_) => None,
300 Update(a, _) | Delete(a) => Some(a),
301 })
302 }
303
304 #[cfg(any(test, debug_assertions))]
305 #[allow(dead_code)]
306 pub fn validate_roundtrip<K, V, D>(
307 metrics: &Metrics,
308 from_state: &crate::internal::state::TypedState<K, V, T, D>,
309 diff: &Self,
310 to_state: &crate::internal::state::TypedState<K, V, T, D>,
311 ) -> Result<(), String>
312 where
313 K: mz_persist_types::Codec + std::fmt::Debug,
314 V: mz_persist_types::Codec + std::fmt::Debug,
315 D: differential_dataflow::difference::Monoid + Codec64,
316 {
317 use mz_proto::RustType;
318 use prost::Message;
319
320 use crate::internal::state::ProtoStateDiff;
321
322 let mut roundtrip_state = from_state.clone(from_state.hostname.clone());
323 roundtrip_state.apply_diff(metrics, diff.clone())?;
324
325 if &roundtrip_state != to_state {
326 return Err(format!(
329 "state didn't roundtrip\n from_state {:?}\n to_state {:?}\n rt_state {:?}\n diff {:?}\n",
330 from_state, to_state, roundtrip_state, diff
331 ));
332 }
333
334 let encoded_diff = diff.into_proto().encode_to_vec();
335 let roundtrip_diff = Self::from_proto(
336 ProtoStateDiff::decode(encoded_diff.as_slice()).map_err(|err| err.to_string())?,
337 )
338 .map_err(|err| err.to_string())?;
339
340 if &roundtrip_diff != diff {
341 return Err(format!(
344 "diff didn't roundtrip\n diff {:?}\n rt_diff {:?}",
345 diff, roundtrip_diff
346 ));
347 }
348
349 Ok(())
350 }
351}
352
353impl<T: Timestamp + Lattice + Codec64> State<T> {
354 pub fn apply_encoded_diffs<'a, I: IntoIterator<Item = &'a VersionedData>>(
355 &mut self,
356 cfg: &PersistConfig,
357 metrics: &Metrics,
358 diffs: I,
359 ) {
360 let mut state_seqno = self.seqno;
361 let diffs = diffs.into_iter().filter_map(move |x| {
362 if x.seqno != state_seqno.next() {
363 return None;
365 }
366 let data = x.data.clone();
367 let diff = metrics
368 .codecs
369 .state_diff
370 .decode(|| StateDiff::decode(&cfg.build_version, x.data.clone()));
372 assert_eq!(diff.seqno_from, state_seqno);
373 state_seqno = diff.seqno_to;
374 Some((diff, data))
375 });
376 self.apply_diffs(metrics, diffs);
377 }
378}
379
380impl<T: Timestamp + Lattice + Codec64> State<T> {
381 pub fn apply_diffs<I: IntoIterator<Item = (StateDiff<T>, Bytes)>>(
382 &mut self,
383 metrics: &Metrics,
384 diffs: I,
385 ) {
386 for (diff, data) in diffs {
387 match self.apply_diff(metrics, diff) {
391 Ok(()) => {}
392 Err(err) => {
393 let diff = StateDiff::<T>::decode(&self.collections.version, data);
399 panic!(
400 "state diff should apply cleanly: {} diff {:?} state {:?}",
401 err, diff, self
402 )
403 }
404 }
405 }
406 }
407
408 pub(super) fn apply_diff(
411 &mut self,
412 metrics: &Metrics,
413 diff: StateDiff<T>,
414 ) -> Result<(), String> {
415 let StateDiff {
417 applier_version: diff_applier_version,
418 seqno_from: diff_seqno_from,
419 seqno_to: diff_seqno_to,
420 walltime_ms: diff_walltime_ms,
421 latest_rollup_key: _,
422 rollups: diff_rollups,
423 active_rollup: diff_active_rollup,
424 active_gc: diff_active_gc,
425 hostname: diff_hostname,
426 last_gc_req: diff_last_gc_req,
427 leased_readers: diff_leased_readers,
428 critical_readers: diff_critical_readers,
429 writers: diff_writers,
430 schemas: diff_schemas,
431 since: diff_since,
432 legacy_batches: diff_legacy_batches,
433 hollow_batches: diff_hollow_batches,
434 spine_batches: diff_spine_batches,
435 merges: diff_merges,
436 } = diff;
437 if self.seqno == diff_seqno_to {
438 return Ok(());
439 }
440 if self.seqno != diff_seqno_from {
441 return Err(format!(
442 "could not apply diff {} -> {} to state {}",
443 diff_seqno_from, diff_seqno_to, self.seqno
444 ));
445 }
446 self.seqno = diff_seqno_to;
447 self.walltime_ms = diff_walltime_ms;
448 force_apply_diffs_single(
449 &self.shard_id,
450 diff_seqno_to,
451 "hostname",
452 diff_hostname,
453 &mut self.hostname,
454 metrics,
455 )?;
456
457 let StateCollections {
460 version,
461 last_gc_req,
462 rollups,
463 active_rollup,
464 active_gc,
465 leased_readers,
466 critical_readers,
467 writers,
468 schemas,
469 trace,
470 } = &mut self.collections;
471
472 *version = diff_applier_version;
473 apply_diffs_map("rollups", diff_rollups, rollups)?;
474 apply_diffs_single("last_gc_req", diff_last_gc_req, last_gc_req)?;
475 apply_diffs_single_option("active_rollup", diff_active_rollup, active_rollup)?;
476 apply_diffs_single_option("active_gc", diff_active_gc, active_gc)?;
477 apply_diffs_map("leased_readers", diff_leased_readers, leased_readers)?;
478 apply_diffs_map("critical_readers", diff_critical_readers, critical_readers)?;
479 apply_diffs_map("writers", diff_writers, writers)?;
480 apply_diffs_map("schemas", diff_schemas, schemas)?;
481
482 let structure_unchanged = diff_hollow_batches.is_empty()
483 && diff_spine_batches.is_empty()
484 && diff_merges.is_empty();
485 let spine_unchanged =
486 diff_since.is_empty() && diff_legacy_batches.is_empty() && structure_unchanged;
487
488 if spine_unchanged {
489 return Ok(());
490 }
491
492 let mut flat = if trace.roundtrip_structure {
493 metrics.state.apply_spine_flattened.inc();
494 let mut flat = trace.flatten();
495 apply_diffs_single("since", diff_since, &mut flat.since)?;
496 apply_diffs_map(
497 "legacy_batches",
498 diff_legacy_batches
499 .into_iter()
500 .map(|StateFieldDiff { key, val }| StateFieldDiff {
501 key: Arc::new(key),
502 val,
503 }),
504 &mut flat.legacy_batches,
505 )?;
506 Some(flat)
507 } else {
508 for x in diff_since {
509 match x.val {
510 Update(from, to) => {
511 if trace.since() != &from {
512 return Err(format!(
513 "since update didn't match: {:?} vs {:?}",
514 self.collections.trace.since(),
515 &from
516 ));
517 }
518 trace.downgrade_since(&to);
519 }
520 Insert(_) => return Err("cannot insert since field".to_string()),
521 Delete(_) => return Err("cannot delete since field".to_string()),
522 }
523 }
524 if !diff_legacy_batches.is_empty() {
525 apply_diffs_spine(metrics, diff_legacy_batches, trace)?;
526 debug_assert_eq!(trace.validate(), Ok(()), "{:?}", trace);
527 }
528 None
529 };
530
531 if !structure_unchanged {
532 let flat = flat.get_or_insert_with(|| trace.flatten());
533 apply_diffs_map(
534 "hollow_batches",
535 diff_hollow_batches,
536 &mut flat.hollow_batches,
537 )?;
538 apply_diffs_map("spine_batches", diff_spine_batches, &mut flat.spine_batches)?;
539 apply_diffs_map("merges", diff_merges, &mut flat.merges)?;
540 }
541
542 if let Some(flat) = flat {
543 *trace = Trace::unflatten(flat)?;
544 }
545
546 Ok(())
552 }
553}
554
555fn diff_field_single<T: PartialEq + Clone>(
556 from: &T,
557 to: &T,
558 diffs: &mut Vec<StateFieldDiff<(), T>>,
559) {
560 if from != to {
563 diffs.push(StateFieldDiff {
564 key: (),
565 val: Update(from.clone(), to.clone()),
566 })
567 }
568}
569
570fn apply_diffs_single_option<X: PartialEq + Debug>(
571 name: &str,
572 diffs: Vec<StateFieldDiff<(), X>>,
573 single: &mut Option<X>,
574) -> Result<(), String> {
575 for diff in diffs {
576 apply_diff_single_option(name, diff, single)?;
577 }
578 Ok(())
579}
580
581fn apply_diff_single_option<X: PartialEq + Debug>(
582 name: &str,
583 diff: StateFieldDiff<(), X>,
584 single: &mut Option<X>,
585) -> Result<(), String> {
586 match diff.val {
587 Update(from, to) => {
588 if single.as_ref() != Some(&from) {
589 return Err(format!(
590 "{} update didn't match: {:?} vs {:?}",
591 name, single, &from
592 ));
593 }
594 *single = Some(to)
595 }
596 Insert(to) => {
597 if single.is_some() {
598 return Err(format!("{} insert found existing value", name));
599 }
600 *single = Some(to)
601 }
602 Delete(from) => {
603 if single.as_ref() != Some(&from) {
604 return Err(format!(
605 "{} delete didn't match: {:?} vs {:?}",
606 name, single, &from
607 ));
608 }
609 *single = None
610 }
611 }
612 Ok(())
613}
614
615fn apply_diffs_single<X: PartialEq + Debug>(
616 name: &str,
617 diffs: Vec<StateFieldDiff<(), X>>,
618 single: &mut X,
619) -> Result<(), String> {
620 for diff in diffs {
621 apply_diff_single(name, diff, single)?;
622 }
623 Ok(())
624}
625
626fn apply_diff_single<X: PartialEq + Debug>(
627 name: &str,
628 diff: StateFieldDiff<(), X>,
629 single: &mut X,
630) -> Result<(), String> {
631 match diff.val {
632 Update(from, to) => {
633 if single != &from {
634 return Err(format!(
635 "{} update didn't match: {:?} vs {:?}",
636 name, single, &from
637 ));
638 }
639 *single = to
640 }
641 Insert(_) => return Err(format!("cannot insert {} field", name)),
642 Delete(_) => return Err(format!("cannot delete {} field", name)),
643 }
644 Ok(())
645}
646
647fn force_apply_diffs_single<X: PartialEq + Debug>(
654 shard_id: &ShardId,
655 seqno: SeqNo,
656 name: &str,
657 diffs: Vec<StateFieldDiff<(), X>>,
658 single: &mut X,
659 metrics: &Metrics,
660) -> Result<(), String> {
661 for diff in diffs {
662 force_apply_diff_single(shard_id, seqno, name, diff, single, metrics)?;
663 }
664 Ok(())
665}
666
667fn force_apply_diff_single<X: PartialEq + Debug>(
668 shard_id: &ShardId,
669 seqno: SeqNo,
670 name: &str,
671 diff: StateFieldDiff<(), X>,
672 single: &mut X,
673 metrics: &Metrics,
674) -> Result<(), String> {
675 match diff.val {
676 Update(from, to) => {
677 if single != &from {
678 debug!(
679 "{}: update didn't match: {:?} vs {:?}, continuing to force apply diff to {:?} for shard {} and seqno {}",
680 name, single, &from, &to, shard_id, seqno
681 );
682 metrics.state.force_apply_hostname.inc();
683 }
684 *single = to
685 }
686 Insert(_) => return Err(format!("cannot insert {} field", name)),
687 Delete(_) => return Err(format!("cannot delete {} field", name)),
688 }
689 Ok(())
690}
691
692fn diff_field_sorted_iter<'a, K, V, IF, IT>(from: IF, to: IT, diffs: &mut Vec<StateFieldDiff<K, V>>)
693where
694 K: Ord + Clone + 'a,
695 V: PartialEq + Clone + 'a,
696 IF: IntoIterator<Item = (&'a K, &'a V)>,
697 IT: IntoIterator<Item = (&'a K, &'a V)>,
698{
699 let (mut from, mut to) = (from.into_iter(), to.into_iter());
700 let (mut f, mut t) = (from.next(), to.next());
701 loop {
702 match (f, t) {
703 (None, None) => break,
704 (Some((fk, fv)), Some((tk, tv))) => match fk.cmp(tk) {
705 Ordering::Less => {
706 diffs.push(StateFieldDiff {
707 key: fk.clone(),
708 val: Delete(fv.clone()),
709 });
710 let f_next = from.next();
711 debug_assert!(f_next.as_ref().map_or(true, |(fk_next, _)| fk_next > &fk));
712 f = f_next;
713 }
714 Ordering::Greater => {
715 diffs.push(StateFieldDiff {
716 key: tk.clone(),
717 val: Insert(tv.clone()),
718 });
719 let t_next = to.next();
720 debug_assert!(t_next.as_ref().map_or(true, |(tk_next, _)| tk_next > &tk));
721 t = t_next;
722 }
723 Ordering::Equal => {
724 if fv != tv {
727 diffs.push(StateFieldDiff {
728 key: fk.clone(),
729 val: Update(fv.clone(), tv.clone()),
730 });
731 }
732 let f_next = from.next();
733 debug_assert!(f_next.as_ref().map_or(true, |(fk_next, _)| fk_next > &fk));
734 f = f_next;
735 let t_next = to.next();
736 debug_assert!(t_next.as_ref().map_or(true, |(tk_next, _)| tk_next > &tk));
737 t = t_next;
738 }
739 },
740 (None, Some((tk, tv))) => {
741 diffs.push(StateFieldDiff {
742 key: tk.clone(),
743 val: Insert(tv.clone()),
744 });
745 let t_next = to.next();
746 debug_assert!(t_next.as_ref().map_or(true, |(tk_next, _)| tk_next > &tk));
747 t = t_next;
748 }
749 (Some((fk, fv)), None) => {
750 diffs.push(StateFieldDiff {
751 key: fk.clone(),
752 val: Delete(fv.clone()),
753 });
754 let f_next = from.next();
755 debug_assert!(f_next.as_ref().map_or(true, |(fk_next, _)| fk_next > &fk));
756 f = f_next;
757 }
758 }
759 }
760}
761
762fn apply_diffs_map<K: Ord, V: PartialEq + Debug>(
763 name: &str,
764 diffs: impl IntoIterator<Item = StateFieldDiff<K, V>>,
765 map: &mut BTreeMap<K, V>,
766) -> Result<(), String> {
767 for diff in diffs {
768 apply_diff_map(name, diff, map)?;
769 }
770 Ok(())
771}
772
773fn apply_diff_map<K: Ord, V: PartialEq + Debug>(
777 name: &str,
778 diff: StateFieldDiff<K, V>,
779 map: &mut BTreeMap<K, V>,
780) -> Result<(), String> {
781 match diff.val {
782 Insert(to) => {
783 let prev = map.insert(diff.key, to);
784 if prev != None {
785 return Err(format!("{} insert found existing value: {:?}", name, prev));
786 }
787 }
788 Update(from, to) => {
789 let prev = map.insert(diff.key, to);
790 if prev.as_ref() != Some(&from) {
791 return Err(format!(
792 "{} update didn't match: {:?} vs {:?}",
793 name,
794 prev,
795 Some(from),
796 ));
797 }
798 }
799 Delete(from) => {
800 let prev = map.remove(&diff.key);
801 if prev.as_ref() != Some(&from) {
802 return Err(format!(
803 "{} delete didn't match: {:?} vs {:?}",
804 name,
805 prev,
806 Some(from),
807 ));
808 }
809 }
810 };
811 Ok(())
812}
813
814fn apply_diffs_spine<T: Timestamp + Lattice + Codec64>(
818 metrics: &Metrics,
819 mut diffs: Vec<StateFieldDiff<HollowBatch<T>, ()>>,
820 trace: &mut Trace<T>,
821) -> Result<(), String> {
822 if let Some(insert) = sniff_insert(&mut diffs, trace.upper()) {
826 let () = trace.push_batch_no_merge_reqs(insert);
829 if diffs.is_empty() {
833 metrics.state.apply_spine_fast_path.inc();
834 return Ok(());
835 }
836 }
837
838 match &diffs[..] {
839 [] => return Ok(()),
841
842 [
846 StateFieldDiff {
847 key: del,
848 val: StateFieldValDiff::Delete(()),
849 },
850 StateFieldDiff {
851 key: ins,
852 val: StateFieldValDiff::Insert(()),
853 },
854 ] => {
855 if del.is_empty()
856 && ins.is_empty()
857 && del.desc.lower() == ins.desc.lower()
858 && PartialOrder::less_than(del.desc.upper(), ins.desc.upper())
859 {
860 let () = trace.push_batch_no_merge_reqs(HollowBatch::empty(Description::new(
863 del.desc.upper().clone(),
864 ins.desc.upper().clone(),
865 Antichain::from_elem(T::minimum()),
869 )));
870 metrics.state.apply_spine_fast_path.inc();
871 return Ok(());
872 }
873 }
874 _ => {}
876 }
877
878 if let Some((_inputs, output)) = sniff_compaction(&diffs) {
880 let res = FueledMergeRes {
881 output,
882 input: CompactionInput::Legacy,
883 new_active_compaction: None,
884 };
885 if trace.apply_merge_res_unchecked(&res).applied() {
896 metrics.state.apply_spine_fast_path.inc();
899 return Ok(());
900 }
901
902 let mut batches = Vec::new();
904 trace.map_batches(|b| batches.push(b.clone()));
905
906 match apply_compaction_lenient(metrics, batches, &res.output) {
907 Ok(batches) => {
908 let mut new_trace = Trace::default();
909 new_trace.roundtrip_structure = trace.roundtrip_structure;
910 new_trace.downgrade_since(trace.since());
911 for batch in batches {
912 let () = new_trace.push_batch_no_merge_reqs(batch.clone());
915 }
916 *trace = new_trace;
917 metrics.state.apply_spine_slow_path_lenient.inc();
918 return Ok(());
919 }
920 Err(err) => {
921 return Err(format!(
922 "lenient compaction result apply unexpectedly failed: {}",
923 err
924 ));
925 }
926 }
927 }
928
929 metrics.state.apply_spine_slow_path.inc();
931 debug!(
932 "apply_diffs_spine didn't hit a fast-path diffs={:?} trace={:?}",
933 diffs, trace
934 );
935
936 let batches = {
937 let mut batches = BTreeMap::new();
938 trace.map_batches(|b| assert_none!(batches.insert(b.clone(), ())));
939 apply_diffs_map("spine", diffs.clone(), &mut batches).map(|_ok| batches)
940 };
941
942 let batches = match batches {
943 Ok(batches) => batches,
944 Err(err) => {
945 metrics
946 .state
947 .apply_spine_slow_path_with_reconstruction
948 .inc();
949 debug!(
950 "apply_diffs_spines could not apply diffs directly to existing trace batches: {}. diffs={:?} trace={:?}",
951 err, diffs, trace
952 );
953 let mut reconstructed_spine = Trace::default();
959 reconstructed_spine.roundtrip_structure = trace.roundtrip_structure;
960 trace.map_batches(|b| {
961 let () = reconstructed_spine.push_batch_no_merge_reqs(b.clone());
964 });
965
966 let mut batches = BTreeMap::new();
967 reconstructed_spine.map_batches(|b| assert_none!(batches.insert(b.clone(), ())));
968 apply_diffs_map("spine", diffs, &mut batches)?;
969 batches
970 }
971 };
972
973 let mut new_trace = Trace::default();
974 new_trace.roundtrip_structure = trace.roundtrip_structure;
975 new_trace.downgrade_since(trace.since());
976 for (batch, ()) in batches {
977 let () = new_trace.push_batch_no_merge_reqs(batch);
980 }
981 *trace = new_trace;
982 Ok(())
983}
984
985fn sniff_insert<T: Timestamp + Lattice>(
986 diffs: &mut Vec<StateFieldDiff<HollowBatch<T>, ()>>,
987 upper: &Antichain<T>,
988) -> Option<HollowBatch<T>> {
989 for idx in 0..diffs.len() {
990 match &diffs[idx] {
991 StateFieldDiff {
992 key,
993 val: StateFieldValDiff::Insert(()),
994 } if key.desc.lower() == upper => return Some(diffs.remove(idx).key),
995 _ => continue,
996 }
997 }
998 None
999}
1000
1001fn sniff_compaction<'a, T: Timestamp + Lattice>(
1004 diffs: &'a [StateFieldDiff<HollowBatch<T>, ()>],
1005) -> Option<(Vec<&'a HollowBatch<T>>, HollowBatch<T>)> {
1006 let mut inserts = diffs.iter().flat_map(|x| match x.val {
1009 StateFieldValDiff::Insert(()) => Some(&x.key),
1010 _ => None,
1011 });
1012 let compaction_output = match inserts.next() {
1013 Some(x) => x,
1014 None => return None,
1015 };
1016 if let Some(_) = inserts.next() {
1017 return None;
1018 }
1019
1020 let mut compaction_inputs = Vec::with_capacity(diffs.len() - 1);
1022 for diff in diffs.iter() {
1023 match diff.val {
1024 StateFieldValDiff::Delete(()) => {
1025 compaction_inputs.push(&diff.key);
1026 }
1027 StateFieldValDiff::Insert(()) => {}
1028 StateFieldValDiff::Update((), ()) => {
1029 return None;
1032 }
1033 }
1034 }
1035
1036 Some((compaction_inputs, compaction_output.clone()))
1037}
1038
1039fn apply_compaction_lenient<'a, T: Timestamp + Lattice>(
1061 metrics: &Metrics,
1062 mut trace: Vec<HollowBatch<T>>,
1063 replacement: &'a HollowBatch<T>,
1064) -> Result<Vec<HollowBatch<T>>, String> {
1065 let mut overlapping_batches = Vec::new();
1066 trace.retain(|b| {
1067 let before_replacement = PartialOrder::less_equal(b.desc.upper(), replacement.desc.lower());
1068 let after_replacement = PartialOrder::less_equal(replacement.desc.upper(), b.desc.lower());
1069 let overlaps_replacement = !(before_replacement || after_replacement);
1070 if overlaps_replacement {
1071 overlapping_batches.push(b.clone());
1072 false
1073 } else {
1074 true
1075 }
1076 });
1077
1078 {
1079 let first_overlapping_batch = match overlapping_batches.first() {
1080 Some(x) => x,
1081 None => return Err("replacement didn't overlap any batches".into()),
1082 };
1083 if PartialOrder::less_than(
1084 first_overlapping_batch.desc.lower(),
1085 replacement.desc.lower(),
1086 ) {
1087 if first_overlapping_batch.len > 0 {
1088 return Err(format!(
1089 "overlapping batch was unexpectedly non-empty: {:?}",
1090 first_overlapping_batch
1091 ));
1092 }
1093 let desc = Description::new(
1094 first_overlapping_batch.desc.lower().clone(),
1095 replacement.desc.lower().clone(),
1096 first_overlapping_batch.desc.since().clone(),
1097 );
1098 trace.push(HollowBatch::empty(desc));
1099 metrics.state.apply_spine_slow_path_lenient_adjustment.inc();
1100 }
1101 }
1102
1103 {
1104 let last_overlapping_batch = match overlapping_batches.last() {
1105 Some(x) => x,
1106 None => return Err("replacement didn't overlap any batches".into()),
1107 };
1108 if PartialOrder::less_than(
1109 replacement.desc.upper(),
1110 last_overlapping_batch.desc.upper(),
1111 ) {
1112 if last_overlapping_batch.len > 0 {
1113 return Err(format!(
1114 "overlapping batch was unexpectedly non-empty: {:?}",
1115 last_overlapping_batch
1116 ));
1117 }
1118 let desc = Description::new(
1119 replacement.desc.upper().clone(),
1120 last_overlapping_batch.desc.upper().clone(),
1121 last_overlapping_batch.desc.since().clone(),
1122 );
1123 trace.push(HollowBatch::empty(desc));
1124 metrics.state.apply_spine_slow_path_lenient_adjustment.inc();
1125 }
1126 }
1127 trace.push(replacement.clone());
1128
1129 trace.sort_by(|a, b| a.desc.lower().elements().cmp(b.desc.lower().elements()));
1131
1132 let mut expected_lower = &Antichain::from_elem(T::minimum());
1134 for b in trace.iter() {
1135 if b.desc.lower() != expected_lower {
1136 return Err(format!(
1137 "lower {:?} did not match expected {:?}: {:?}",
1138 b.desc.lower(),
1139 expected_lower,
1140 trace
1141 ));
1142 }
1143 expected_lower = b.desc.upper();
1144 }
1145 Ok(trace)
1146}
1147
1148#[derive(Debug)]
1164pub struct ProtoStateFieldDiffsWriter {
1165 data_buf: BytesMut,
1166 proto: ProtoStateFieldDiffs,
1167}
1168
1169impl ProtoStateFieldDiffsWriter {
1170 pub fn push_field(&mut self, field: ProtoStateField) {
1172 self.proto.fields.push(i32::from(field));
1173 }
1174
1175 pub fn push_diff_type(&mut self, diff_type: ProtoStateFieldDiffType) {
1177 self.proto.diff_types.push(i32::from(diff_type));
1178 }
1179
1180 pub fn encode_proto<M: prost::Message>(&mut self, msg: &M) {
1182 let len_before = self.data_buf.len();
1183 self.data_buf.reserve(msg.encoded_len());
1184
1185 msg.encode_raw(&mut self.data_buf);
1190
1191 let written_len = self.data_buf.len() - len_before;
1193 self.proto.data_lens.push(u64::cast_from(written_len));
1194 }
1195
1196 pub fn into_proto(self) -> ProtoStateFieldDiffs {
1197 let ProtoStateFieldDiffsWriter {
1198 data_buf,
1199 mut proto,
1200 } = self;
1201
1202 assert!(proto.data_bytes.is_empty());
1204
1205 let data_bytes = data_buf.freeze();
1207 proto.data_bytes = data_bytes;
1208
1209 proto
1210 }
1211}
1212
1213impl ProtoStateFieldDiffs {
1214 pub fn into_writer(mut self) -> ProtoStateFieldDiffsWriter {
1215 let mut data_buf = BytesMut::with_capacity(self.data_bytes.len());
1217
1218 let existing_data = std::mem::take(&mut self.data_bytes);
1220 data_buf.extend(existing_data);
1221
1222 ProtoStateFieldDiffsWriter {
1223 data_buf,
1224 proto: self,
1225 }
1226 }
1227
1228 pub fn iter<'a>(&'a self) -> ProtoStateFieldDiffsIter<'a> {
1229 let len = self.fields.len();
1230 assert_eq!(self.diff_types.len(), len);
1231
1232 ProtoStateFieldDiffsIter {
1233 len,
1234 diff_idx: 0,
1235 data_idx: 0,
1236 data_offset: 0,
1237 diffs: self,
1238 }
1239 }
1240
1241 pub fn validate(&self) -> Result<(), String> {
1242 if self.fields.len() != self.diff_types.len() {
1243 return Err(format!(
1244 "fields {} and diff_types {} lengths disagree",
1245 self.fields.len(),
1246 self.diff_types.len()
1247 ));
1248 }
1249
1250 let mut expected_data_slices = 0;
1251 for diff_type in self.diff_types.iter() {
1252 expected_data_slices += 1;
1254 match ProtoStateFieldDiffType::try_from(*diff_type) {
1256 Ok(ProtoStateFieldDiffType::Insert) => expected_data_slices += 1,
1257 Ok(ProtoStateFieldDiffType::Update) => expected_data_slices += 2,
1258 Ok(ProtoStateFieldDiffType::Delete) => expected_data_slices += 1,
1259 Err(_) => return Err(format!("unknown diff_type {}", diff_type)),
1260 }
1261 }
1262 if expected_data_slices != self.data_lens.len() {
1263 return Err(format!(
1264 "expected {} data slices got {}",
1265 expected_data_slices,
1266 self.data_lens.len()
1267 ));
1268 }
1269
1270 let expected_data_bytes = usize::cast_from(self.data_lens.iter().copied().sum::<u64>());
1271 if expected_data_bytes != self.data_bytes.len() {
1272 return Err(format!(
1273 "expected {} data bytes got {}",
1274 expected_data_bytes,
1275 self.data_bytes.len()
1276 ));
1277 }
1278
1279 Ok(())
1280 }
1281}
1282
1283#[derive(Debug)]
1284pub struct ProtoStateFieldDiff<'a> {
1285 pub key: &'a [u8],
1286 pub diff_type: ProtoStateFieldDiffType,
1287 pub from: &'a [u8],
1288 pub to: &'a [u8],
1289}
1290
1291pub struct ProtoStateFieldDiffsIter<'a> {
1292 len: usize,
1293 diff_idx: usize,
1294 data_idx: usize,
1295 data_offset: usize,
1296 diffs: &'a ProtoStateFieldDiffs,
1297}
1298
1299impl<'a> Iterator for ProtoStateFieldDiffsIter<'a> {
1300 type Item = Result<(ProtoStateField, ProtoStateFieldDiff<'a>), TryFromProtoError>;
1301
1302 fn next(&mut self) -> Option<Self::Item> {
1303 if self.diff_idx >= self.len {
1304 return None;
1305 }
1306 let mut next_data = || {
1307 let start = self.data_offset;
1308 let end = start + usize::cast_from(self.diffs.data_lens[self.data_idx]);
1309 let data = &self.diffs.data_bytes[start..end];
1310 self.data_idx += 1;
1311 self.data_offset = end;
1312 data
1313 };
1314 let field = match ProtoStateField::try_from(self.diffs.fields[self.diff_idx]) {
1315 Ok(x) => x,
1316 Err(_) => {
1317 return Some(Err(TryFromProtoError::unknown_enum_variant(format!(
1318 "ProtoStateField({})",
1319 self.diffs.fields[self.diff_idx]
1320 ))));
1321 }
1322 };
1323 let diff_type =
1324 match ProtoStateFieldDiffType::try_from(self.diffs.diff_types[self.diff_idx]) {
1325 Ok(x) => x,
1326 Err(_) => {
1327 return Some(Err(TryFromProtoError::unknown_enum_variant(format!(
1328 "ProtoStateFieldDiffType({})",
1329 self.diffs.diff_types[self.diff_idx]
1330 ))));
1331 }
1332 };
1333 let key = next_data();
1334 let (from, to): (&[u8], &[u8]) = match diff_type {
1335 ProtoStateFieldDiffType::Insert => (&[], next_data()),
1336 ProtoStateFieldDiffType::Update => (next_data(), next_data()),
1337 ProtoStateFieldDiffType::Delete => (next_data(), &[]),
1338 };
1339 let diff = ProtoStateFieldDiff {
1340 key,
1341 diff_type,
1342 from,
1343 to,
1344 };
1345 self.diff_idx += 1;
1346 Some(Ok((field, diff)))
1347 }
1348}
1349
1350#[cfg(test)]
1351mod tests {
1352 use semver::Version;
1353 use std::ops::ControlFlow::Continue;
1354
1355 use crate::internal::paths::{PartId, PartialBatchKey, RollupId, WriterKey};
1356 use mz_ore::metrics::MetricsRegistry;
1357
1358 use crate::ShardId;
1359 use crate::internal::state::TypedState;
1360
1361 use super::*;
1362
1363 #[mz_ore::test]
1366 #[cfg_attr(miri, ignore)] fn test_state_sync() {
1368 use proptest::prelude::*;
1369
1370 #[derive(Debug, Clone)]
1371 enum Action {
1372 Append { empty: bool, time_delta: u64 },
1374 Compact { req: usize },
1376 }
1377
1378 let action_gen: BoxedStrategy<Action> = {
1379 prop::strategy::Union::new([
1380 (any::<bool>(), 1u64..10u64)
1381 .prop_map(|(empty, time_delta)| Action::Append { empty, time_delta })
1382 .boxed(),
1383 (0usize..10usize)
1384 .prop_map(|req| Action::Compact { req })
1385 .boxed(),
1386 ])
1387 .boxed()
1388 };
1389
1390 fn run(actions: Vec<(Action, bool)>, metrics: &Metrics) {
1391 let version = Version::new(0, 100, 0);
1392 let writer_key = WriterKey::Version(version.to_string());
1393 let id = ShardId::new();
1394 let hostname = "computer";
1395 let typed: TypedState<String, (), u64, i64> =
1396 TypedState::new(version, id, hostname.to_string(), 0);
1397 let mut leader = typed.state;
1398
1399 let seqno = SeqNo::minimum();
1400 let mut lower = 0u64;
1401 let mut merge_reqs = vec![];
1402
1403 leader.collections.rollups.insert(
1404 seqno,
1405 HollowRollup {
1406 key: PartialRollupKey::new(seqno, &RollupId::new()),
1407 encoded_size_bytes: None,
1408 },
1409 );
1410 leader.collections.trace.roundtrip_structure = false;
1411 let mut follower = leader.clone();
1412
1413 for (action, roundtrip_structure) in actions {
1414 let mut old_leader = leader.clone();
1416 match action {
1417 Action::Append { empty, time_delta } => {
1418 let upper = lower + time_delta;
1419 let key = if empty {
1420 None
1421 } else {
1422 let id = PartId::new();
1423 Some(PartialBatchKey::new(&writer_key, &id))
1424 };
1425
1426 let keys = key.as_ref().map(|k| k.0.as_str());
1427 let reqs = leader.collections.trace.push_batch(
1428 crate::internal::state::tests::hollow(
1429 lower,
1430 upper,
1431 keys.as_slice(),
1432 if empty { 0 } else { 1 },
1433 ),
1434 );
1435 merge_reqs.extend(reqs);
1436 lower = upper;
1437 }
1438 Action::Compact { req } => {
1439 if !merge_reqs.is_empty() {
1440 let req = merge_reqs.remove(req.min(merge_reqs.len() - 1));
1441 let len = req.inputs.iter().map(|p| p.batch.len).sum();
1442 let parts = req
1443 .inputs
1444 .into_iter()
1445 .flat_map(|p| p.batch.parts.clone())
1446 .collect();
1447 let output = HollowBatch::new_run(req.desc, parts, len);
1448 leader
1449 .collections
1450 .trace
1451 .apply_merge_res_unchecked(&FueledMergeRes {
1452 output,
1453 input: CompactionInput::Legacy,
1454 new_active_compaction: None,
1455 });
1456 }
1457 }
1458 }
1459 leader.collections.trace.roundtrip_structure = roundtrip_structure;
1460 leader.seqno.0 += 1;
1461 let diff = StateDiff::from_diff(&old_leader, &leader);
1462
1463 old_leader
1466 .apply_diff(metrics, diff.clone())
1467 .expect("diff applies to the old version of the leader state");
1468 follower
1469 .apply_diff(metrics, diff.clone())
1470 .expect("diff applies to the synced version of the follower state");
1471
1472 }
1475 }
1476
1477 let config = PersistConfig::new_for_tests();
1478 let metrics_registry = MetricsRegistry::new();
1479 let metrics: Metrics = Metrics::new(&config, &metrics_registry);
1480
1481 proptest!(|(actions in prop::collection::vec((action_gen, any::<bool>()), 1..20))| {
1482 run(actions, &metrics)
1483 })
1484 }
1485
1486 #[mz_ore::test]
1490 fn regression_15493_sniff_insert() {
1491 fn hb(lower: u64, upper: u64, len: usize) -> HollowBatch<u64> {
1492 HollowBatch::new_run(
1493 Description::new(
1494 Antichain::from_elem(lower),
1495 Antichain::from_elem(upper),
1496 Antichain::from_elem(0),
1497 ),
1498 Vec::new(),
1499 len,
1500 )
1501 }
1502
1503 let batches_before = [hb(0, 7094664, 0), hb(7094664, 7185234, 100)];
1520
1521 let diffs = vec![
1522 StateFieldDiff {
1523 key: hb(0, 6805359, 0),
1524 val: StateFieldValDiff::Delete(()),
1525 },
1526 StateFieldDiff {
1527 key: hb(6805359, 7083793, 0),
1528 val: StateFieldValDiff::Delete(()),
1529 },
1530 StateFieldDiff {
1531 key: hb(0, 7083793, 0),
1532 val: StateFieldValDiff::Insert(()),
1533 },
1534 StateFieldDiff {
1535 key: hb(7185234, 7185859, 20),
1536 val: StateFieldValDiff::Insert(()),
1537 },
1538 ];
1539
1540 let batches_after = vec![
1547 hb(0, 7094664, 0),
1548 hb(7094664, 7185234, 100),
1549 hb(7185234, 7185859, 20),
1550 ];
1551
1552 let cfg = PersistConfig::new_for_tests();
1553 let state = TypedState::<(), (), u64, i64>::new(
1554 cfg.build_version.clone(),
1555 ShardId::new(),
1556 cfg.hostname.clone(),
1557 (cfg.now)(),
1558 );
1559 let state = state.clone_apply(&cfg, &mut |_seqno, _cfg, state| {
1560 for b in batches_before.iter() {
1561 let _merge_reqs = state.trace.push_batch(b.clone());
1562 }
1563 Continue::<(), ()>(())
1564 });
1565 let mut state = match state {
1566 Continue((_, x)) => x,
1567 _ => unreachable!(),
1568 };
1569
1570 let metrics = Metrics::new(&PersistConfig::new_for_tests(), &MetricsRegistry::new());
1571 assert_eq!(
1572 apply_diffs_spine(&metrics, diffs, &mut state.collections.trace),
1573 Ok(())
1574 );
1575
1576 let mut actual = Vec::new();
1577 state
1578 .collections
1579 .trace
1580 .map_batches(|b| actual.push(b.clone()));
1581 assert_eq!(actual, batches_after);
1582 }
1583
1584 #[mz_ore::test]
1585 #[cfg_attr(miri, ignore)] fn apply_lenient() {
1587 #[track_caller]
1588 fn testcase(
1589 replacement: (u64, u64, u64, usize),
1590 spine: &[(u64, u64, u64, usize)],
1591 expected: Result<&[(u64, u64, u64, usize)], &str>,
1592 ) {
1593 fn batch(x: &(u64, u64, u64, usize)) -> HollowBatch<u64> {
1594 let (lower, upper, since, len) = x;
1595 let desc = Description::new(
1596 Antichain::from_elem(*lower),
1597 Antichain::from_elem(*upper),
1598 Antichain::from_elem(*since),
1599 );
1600 HollowBatch::new_run(desc, Vec::new(), *len)
1601 }
1602 let replacement = batch(&replacement);
1603 let batches = spine.iter().map(batch).collect::<Vec<_>>();
1604
1605 let metrics = Metrics::new(&PersistConfig::new_for_tests(), &MetricsRegistry::new());
1606 let actual = apply_compaction_lenient(&metrics, batches, &replacement);
1607 let expected = match expected {
1608 Ok(batches) => Ok(batches.iter().map(batch).collect::<Vec<_>>()),
1609 Err(err) => Err(err.to_owned()),
1610 };
1611 assert_eq!(actual, expected);
1612 }
1613
1614 testcase(
1616 (0, 3, 0, 100),
1617 &[(0, 1, 0, 0), (1, 2, 0, 0), (2, 3, 0, 0)],
1618 Ok(&[(0, 3, 0, 100)]),
1619 );
1620
1621 testcase(
1623 (1, 2, 0, 100),
1624 &[(0, 3, 0, 0)],
1625 Ok(&[(0, 1, 0, 0), (1, 2, 0, 100), (2, 3, 0, 0)]),
1626 );
1627
1628 testcase(
1630 (2, 4, 0, 100),
1631 &[(0, 3, 0, 0), (3, 4, 0, 0)],
1632 Ok(&[(0, 2, 0, 0), (2, 4, 0, 100)]),
1633 );
1634
1635 testcase(
1637 (2, 4, 0, 100),
1638 &[(0, 3, 0, 1), (3, 4, 0, 0)],
1639 Err(
1640 "overlapping batch was unexpectedly non-empty: HollowBatch { desc: ([0], [3], [0]), parts: [], len: 1, runs: [], run_meta: [] }",
1641 ),
1642 );
1643
1644 testcase(
1646 (2, 4, 0, 100),
1647 &[(0, 1, 0, 0), (1, 3, 0, 0), (3, 4, 0, 0)],
1648 Ok(&[(0, 1, 0, 0), (1, 2, 0, 0), (2, 4, 0, 100)]),
1649 );
1650
1651 testcase(
1653 (2, 4, 0, 100),
1654 &[(0, 3, 200, 0), (3, 4, 0, 0)],
1655 Ok(&[(0, 2, 200, 0), (2, 4, 0, 100)]),
1656 );
1657
1658 testcase(
1660 (0, 2, 0, 100),
1661 &[(0, 1, 0, 0), (1, 4, 0, 0)],
1662 Ok(&[(0, 2, 0, 100), (2, 4, 0, 0)]),
1663 );
1664
1665 testcase(
1667 (0, 2, 0, 100),
1668 &[(0, 1, 0, 0), (1, 4, 0, 1)],
1669 Err(
1670 "overlapping batch was unexpectedly non-empty: HollowBatch { desc: ([1], [4], [0]), parts: [], len: 1, runs: [], run_meta: [] }",
1671 ),
1672 );
1673
1674 testcase(
1676 (0, 2, 0, 100),
1677 &[(0, 1, 0, 0), (1, 3, 0, 0), (3, 4, 0, 0)],
1678 Ok(&[(0, 2, 0, 100), (2, 3, 0, 0), (3, 4, 0, 0)]),
1679 );
1680
1681 testcase(
1683 (0, 2, 0, 100),
1684 &[(0, 1, 0, 0), (1, 4, 200, 0)],
1685 Ok(&[(0, 2, 0, 100), (2, 4, 200, 0)]),
1686 );
1687
1688 testcase(
1690 (2, 6, 0, 100),
1691 &[(0, 3, 0, 0), (3, 5, 0, 0), (5, 8, 0, 0)],
1692 Ok(&[(0, 2, 0, 0), (2, 6, 0, 100), (6, 8, 0, 0)]),
1693 );
1694
1695 testcase(
1697 (2, 3, 0, 100),
1698 &[(0, 1, 0, 0)],
1699 Err("replacement didn't overlap any batches"),
1700 );
1701
1702 testcase(
1704 (2, 3, 0, 100),
1705 &[(4, 5, 0, 0)],
1706 Err("replacement didn't overlap any batches"),
1707 );
1708 }
1709}