1use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::fmt::Debug;
13use std::sync::Arc;
14
15use bytes::{Bytes, BytesMut};
16use differential_dataflow::lattice::Lattice;
17use differential_dataflow::trace::Description;
18use mz_ore::assert_none;
19use mz_ore::cast::CastFrom;
20use mz_persist::location::{SeqNo, VersionedData};
21use mz_persist_types::Codec64;
22use mz_persist_types::schema::SchemaId;
23use mz_proto::TryFromProtoError;
24use timely::PartialOrder;
25use timely::progress::{Antichain, Timestamp};
26use tracing::debug;
27
28use crate::critical::CriticalReaderId;
29use crate::internal::paths::PartialRollupKey;
30use crate::internal::state::{
31 CriticalReaderState, EncodedSchemas, HollowBatch, HollowBlobRef, HollowRollup,
32 LeasedReaderState, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs, RunPart,
33 State, StateCollections, WriterState,
34};
35use crate::internal::trace::{FueledMergeRes, SpineId, ThinMerge, ThinSpineBatch, Trace};
36use crate::read::LeasedReaderId;
37use crate::write::WriterId;
38use crate::{Metrics, PersistConfig, ShardId};
39
40use StateFieldValDiff::*;
41
42use super::state::{ActiveGc, ActiveRollup};
43
44#[derive(Clone, Debug)]
45#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
46pub enum StateFieldValDiff<V> {
47 Insert(V),
48 Update(V, V),
49 Delete(V),
50}
51
52#[derive(Clone)]
53#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
54pub struct StateFieldDiff<K, V> {
55 pub key: K,
56 pub val: StateFieldValDiff<V>,
57}
58
59impl<K: Debug, V: Debug> std::fmt::Debug for StateFieldDiff<K, V> {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 f.debug_struct("StateFieldDiff")
62 .field("val", &self.val)
65 .field("key", &self.key)
66 .finish()
67 }
68}
69
70#[derive(Debug)]
71#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
72pub struct StateDiff<T> {
73 pub(crate) applier_version: semver::Version,
74 pub(crate) seqno_from: SeqNo,
75 pub(crate) seqno_to: SeqNo,
76 pub(crate) walltime_ms: u64,
77 pub(crate) latest_rollup_key: PartialRollupKey,
78 pub(crate) rollups: Vec<StateFieldDiff<SeqNo, HollowRollup>>,
79 pub(crate) active_rollup: Vec<StateFieldDiff<(), ActiveRollup>>,
80 pub(crate) active_gc: Vec<StateFieldDiff<(), ActiveGc>>,
81 pub(crate) hostname: Vec<StateFieldDiff<(), String>>,
82 pub(crate) last_gc_req: Vec<StateFieldDiff<(), SeqNo>>,
83 pub(crate) leased_readers: Vec<StateFieldDiff<LeasedReaderId, LeasedReaderState<T>>>,
84 pub(crate) critical_readers: Vec<StateFieldDiff<CriticalReaderId, CriticalReaderState<T>>>,
85 pub(crate) writers: Vec<StateFieldDiff<WriterId, WriterState<T>>>,
86 pub(crate) schemas: Vec<StateFieldDiff<SchemaId, EncodedSchemas>>,
87 pub(crate) since: Vec<StateFieldDiff<(), Antichain<T>>>,
88 pub(crate) legacy_batches: Vec<StateFieldDiff<HollowBatch<T>, ()>>,
89 pub(crate) hollow_batches: Vec<StateFieldDiff<SpineId, Arc<HollowBatch<T>>>>,
90 pub(crate) spine_batches: Vec<StateFieldDiff<SpineId, ThinSpineBatch<T>>>,
91 pub(crate) merges: Vec<StateFieldDiff<SpineId, ThinMerge<T>>>,
92}
93
94impl<T: Timestamp + Codec64> StateDiff<T> {
95 pub fn new(
96 applier_version: semver::Version,
97 seqno_from: SeqNo,
98 seqno_to: SeqNo,
99 walltime_ms: u64,
100 latest_rollup_key: PartialRollupKey,
101 ) -> Self {
102 StateDiff {
103 applier_version,
104 seqno_from,
105 seqno_to,
106 walltime_ms,
107 latest_rollup_key,
108 rollups: Vec::default(),
109 active_rollup: Vec::default(),
110 active_gc: Vec::default(),
111 hostname: Vec::default(),
112 last_gc_req: Vec::default(),
113 leased_readers: Vec::default(),
114 critical_readers: Vec::default(),
115 writers: Vec::default(),
116 schemas: Vec::default(),
117 since: Vec::default(),
118 legacy_batches: Vec::default(),
119 hollow_batches: Vec::default(),
120 spine_batches: Vec::default(),
121 merges: Vec::default(),
122 }
123 }
124
125 pub fn referenced_batches(&self) -> impl Iterator<Item = StateFieldValDiff<&HollowBatch<T>>> {
126 let legacy_batches = self
127 .legacy_batches
128 .iter()
129 .filter_map(|diff| match diff.val {
130 Insert(()) => Some(Insert(&diff.key)),
131 Update((), ()) => None, Delete(()) => Some(Delete(&diff.key)),
133 });
134 let hollow_batches = self.hollow_batches.iter().map(|diff| match &diff.val {
135 Insert(batch) => Insert(&**batch),
136 Update(before, after) => Update(&**before, &**after),
137 Delete(batch) => Delete(&**batch),
138 });
139 legacy_batches.chain(hollow_batches)
140 }
141}
142
143impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
144 pub fn from_diff(from: &State<T>, to: &State<T>) -> Self {
145 let State {
148 applier_version: _,
149 shard_id: from_shard_id,
150 seqno: from_seqno,
151 hostname: from_hostname,
152 walltime_ms: _, collections:
154 StateCollections {
155 last_gc_req: from_last_gc_req,
156 rollups: from_rollups,
157 active_rollup: from_active_rollup,
158 active_gc: from_active_gc,
159 leased_readers: from_leased_readers,
160 critical_readers: from_critical_readers,
161 writers: from_writers,
162 schemas: from_schemas,
163 trace: from_trace,
164 },
165 } = from;
166 let State {
167 applier_version: to_applier_version,
168 shard_id: to_shard_id,
169 seqno: to_seqno,
170 walltime_ms: to_walltime_ms,
171 hostname: to_hostname,
172 collections:
173 StateCollections {
174 last_gc_req: to_last_gc_req,
175 rollups: to_rollups,
176 active_rollup: to_active_rollup,
177 active_gc: to_active_gc,
178 leased_readers: to_leased_readers,
179 critical_readers: to_critical_readers,
180 writers: to_writers,
181 schemas: to_schemas,
182 trace: to_trace,
183 },
184 } = to;
185 assert_eq!(from_shard_id, to_shard_id);
186
187 let (_, latest_rollup) = to.latest_rollup();
188 let mut diffs = Self::new(
189 to_applier_version.clone(),
190 *from_seqno,
191 *to_seqno,
192 *to_walltime_ms,
193 latest_rollup.key.clone(),
194 );
195 diff_field_single(from_hostname, to_hostname, &mut diffs.hostname);
196 diff_field_single(from_last_gc_req, to_last_gc_req, &mut diffs.last_gc_req);
197 diff_field_sorted_iter(
198 from_active_rollup.iter().map(|r| (&(), r)),
199 to_active_rollup.iter().map(|r| (&(), r)),
200 &mut diffs.active_rollup,
201 );
202 diff_field_sorted_iter(
203 from_active_gc.iter().map(|g| (&(), g)),
204 to_active_gc.iter().map(|g| (&(), g)),
205 &mut diffs.active_gc,
206 );
207 diff_field_sorted_iter(from_rollups.iter(), to_rollups, &mut diffs.rollups);
208 diff_field_sorted_iter(
209 from_leased_readers.iter(),
210 to_leased_readers,
211 &mut diffs.leased_readers,
212 );
213 diff_field_sorted_iter(
214 from_critical_readers.iter(),
215 to_critical_readers,
216 &mut diffs.critical_readers,
217 );
218 diff_field_sorted_iter(from_writers.iter(), to_writers, &mut diffs.writers);
219 diff_field_sorted_iter(from_schemas.iter(), to_schemas, &mut diffs.schemas);
220 diff_field_single(from_trace.since(), to_trace.since(), &mut diffs.since);
221
222 let from_flat = from_trace.flatten();
223 let to_flat = to_trace.flatten();
224 diff_field_sorted_iter(
225 from_flat.legacy_batches.iter().map(|(k, v)| (&**k, v)),
226 to_flat.legacy_batches.iter().map(|(k, v)| (&**k, v)),
227 &mut diffs.legacy_batches,
228 );
229 diff_field_sorted_iter(
230 from_flat.hollow_batches.iter(),
231 to_flat.hollow_batches.iter(),
232 &mut diffs.hollow_batches,
233 );
234 diff_field_sorted_iter(
235 from_flat.spine_batches.iter(),
236 to_flat.spine_batches.iter(),
237 &mut diffs.spine_batches,
238 );
239 diff_field_sorted_iter(
240 from_flat.merges.iter(),
241 to_flat.merges.iter(),
242 &mut diffs.merges,
243 );
244 diffs
245 }
246
247 pub(crate) fn blob_inserts(&self) -> impl Iterator<Item = HollowBlobRef<T>> {
248 let batches = self
249 .referenced_batches()
250 .filter_map(|spine_diff| match spine_diff {
251 Insert(b) | Update(_, b) => Some(HollowBlobRef::Batch(b)),
252 Delete(_) => None, });
254 let rollups = self
255 .rollups
256 .iter()
257 .filter_map(|rollups_diff| match &rollups_diff.val {
258 StateFieldValDiff::Insert(x) | StateFieldValDiff::Update(_, x) => {
259 Some(HollowBlobRef::Rollup(x))
260 }
261 StateFieldValDiff::Delete(_) => None, });
263 batches.chain(rollups)
264 }
265
266 pub(crate) fn part_deletes(&self) -> impl Iterator<Item = &RunPart<T>> {
267 let removed = self
273 .referenced_batches()
274 .filter_map(|spine_diff| match spine_diff {
275 Insert(_) => None,
276 Update(a, _) | Delete(a) => Some(a.parts.iter().collect::<Vec<_>>()),
277 });
278
279 let added: std::collections::BTreeSet<_> = self
280 .referenced_batches()
281 .filter_map(|spine_diff| match spine_diff {
282 Insert(a) | Update(_, a) => Some(a.parts.iter().collect::<Vec<_>>()),
283 Delete(_) => None,
284 })
285 .flatten()
286 .collect();
287
288 removed
289 .into_iter()
290 .flat_map(|x| x)
291 .filter(move |part| !added.contains(part))
292 }
293
294 pub(crate) fn rollup_deletes(&self) -> impl Iterator<Item = &HollowRollup> {
295 self.rollups
296 .iter()
297 .filter_map(|rollups_diff| match &rollups_diff.val {
298 Insert(_) => None,
299 Update(a, _) | Delete(a) => Some(a),
300 })
301 }
302
303 #[cfg(any(test, debug_assertions))]
304 #[allow(dead_code)]
305 pub fn validate_roundtrip<K, V, D>(
306 metrics: &Metrics,
307 from_state: &crate::internal::state::TypedState<K, V, T, D>,
308 diff: &Self,
309 to_state: &crate::internal::state::TypedState<K, V, T, D>,
310 ) -> Result<(), String>
311 where
312 K: mz_persist_types::Codec + std::fmt::Debug,
313 V: mz_persist_types::Codec + std::fmt::Debug,
314 D: differential_dataflow::difference::Semigroup + Codec64,
315 {
316 use mz_proto::RustType;
317 use prost::Message;
318
319 use crate::internal::state::ProtoStateDiff;
320
321 let mut roundtrip_state = from_state.clone(
322 from_state.applier_version.clone(),
323 from_state.hostname.clone(),
324 );
325 roundtrip_state.apply_diff(metrics, diff.clone())?;
326
327 if &roundtrip_state != to_state {
328 return Err(format!(
331 "state didn't roundtrip\n from_state {:?}\n to_state {:?}\n rt_state {:?}\n diff {:?}\n",
332 from_state, to_state, roundtrip_state, diff
333 ));
334 }
335
336 let encoded_diff = diff.into_proto().encode_to_vec();
337 let roundtrip_diff = Self::from_proto(
338 ProtoStateDiff::decode(encoded_diff.as_slice()).map_err(|err| err.to_string())?,
339 )
340 .map_err(|err| err.to_string())?;
341
342 if &roundtrip_diff != diff {
343 return Err(format!(
346 "diff didn't roundtrip\n diff {:?}\n rt_diff {:?}",
347 diff, roundtrip_diff
348 ));
349 }
350
351 Ok(())
352 }
353}
354
355impl<T: Timestamp + Lattice + Codec64> State<T> {
356 pub fn apply_encoded_diffs<'a, I: IntoIterator<Item = &'a VersionedData>>(
357 &mut self,
358 cfg: &PersistConfig,
359 metrics: &Metrics,
360 diffs: I,
361 ) {
362 let mut state_seqno = self.seqno;
363 let diffs = diffs.into_iter().filter_map(move |x| {
364 if x.seqno != state_seqno.next() {
365 return None;
367 }
368 let data = x.data.clone();
369 let diff = metrics
370 .codecs
371 .state_diff
372 .decode(|| StateDiff::decode(&cfg.build_version, x.data.clone()));
374 assert_eq!(diff.seqno_from, state_seqno);
375 state_seqno = diff.seqno_to;
376 Some((diff, data))
377 });
378 self.apply_diffs(metrics, diffs);
379 }
380}
381
382impl<T: Timestamp + Lattice + Codec64> State<T> {
383 pub fn apply_diffs<I: IntoIterator<Item = (StateDiff<T>, Bytes)>>(
384 &mut self,
385 metrics: &Metrics,
386 diffs: I,
387 ) {
388 for (diff, data) in diffs {
389 match self.apply_diff(metrics, diff) {
393 Ok(()) => {}
394 Err(err) => {
395 let diff = StateDiff::<T>::decode(&self.applier_version, data);
400 panic!(
401 "state diff should apply cleanly: {} diff {:?} state {:?}",
402 err, diff, self
403 )
404 }
405 }
406 }
407 }
408
409 pub(super) fn apply_diff(
412 &mut self,
413 metrics: &Metrics,
414 diff: StateDiff<T>,
415 ) -> Result<(), String> {
416 let StateDiff {
418 applier_version: diff_applier_version,
419 seqno_from: diff_seqno_from,
420 seqno_to: diff_seqno_to,
421 walltime_ms: diff_walltime_ms,
422 latest_rollup_key: _,
423 rollups: diff_rollups,
424 active_rollup: diff_active_rollup,
425 active_gc: diff_active_gc,
426 hostname: diff_hostname,
427 last_gc_req: diff_last_gc_req,
428 leased_readers: diff_leased_readers,
429 critical_readers: diff_critical_readers,
430 writers: diff_writers,
431 schemas: diff_schemas,
432 since: diff_since,
433 legacy_batches: diff_legacy_batches,
434 hollow_batches: diff_hollow_batches,
435 spine_batches: diff_spine_batches,
436 merges: diff_merges,
437 } = diff;
438 if self.seqno == diff_seqno_to {
439 return Ok(());
440 }
441 if self.seqno != diff_seqno_from {
442 return Err(format!(
443 "could not apply diff {} -> {} to state {}",
444 diff_seqno_from, diff_seqno_to, self.seqno
445 ));
446 }
447 self.seqno = diff_seqno_to;
448 self.applier_version = diff_applier_version;
449 self.walltime_ms = diff_walltime_ms;
450 force_apply_diffs_single(
451 &self.shard_id,
452 diff_seqno_to,
453 "hostname",
454 diff_hostname,
455 &mut self.hostname,
456 metrics,
457 )?;
458
459 let StateCollections {
462 last_gc_req,
463 rollups,
464 active_rollup,
465 active_gc,
466 leased_readers,
467 critical_readers,
468 writers,
469 schemas,
470 trace,
471 } = &mut self.collections;
472
473 apply_diffs_map("rollups", diff_rollups, rollups)?;
474 apply_diffs_single("last_gc_req", diff_last_gc_req, last_gc_req)?;
475 apply_diffs_single_option("active_rollup", diff_active_rollup, active_rollup)?;
476 apply_diffs_single_option("active_gc", diff_active_gc, active_gc)?;
477 apply_diffs_map("leased_readers", diff_leased_readers, leased_readers)?;
478 apply_diffs_map("critical_readers", diff_critical_readers, critical_readers)?;
479 apply_diffs_map("writers", diff_writers, writers)?;
480 apply_diffs_map("schemas", diff_schemas, schemas)?;
481
482 let structure_unchanged = diff_hollow_batches.is_empty()
483 && diff_spine_batches.is_empty()
484 && diff_merges.is_empty();
485 let spine_unchanged =
486 diff_since.is_empty() && diff_legacy_batches.is_empty() && structure_unchanged;
487
488 if spine_unchanged {
489 return Ok(());
490 }
491
492 let mut flat = if trace.roundtrip_structure {
493 metrics.state.apply_spine_flattened.inc();
494 let mut flat = trace.flatten();
495 apply_diffs_single("since", diff_since, &mut flat.since)?;
496 apply_diffs_map(
497 "legacy_batches",
498 diff_legacy_batches
499 .into_iter()
500 .map(|StateFieldDiff { key, val }| StateFieldDiff {
501 key: Arc::new(key),
502 val,
503 }),
504 &mut flat.legacy_batches,
505 )?;
506 Some(flat)
507 } else {
508 for x in diff_since {
509 match x.val {
510 Update(from, to) => {
511 if trace.since() != &from {
512 return Err(format!(
513 "since update didn't match: {:?} vs {:?}",
514 self.collections.trace.since(),
515 &from
516 ));
517 }
518 trace.downgrade_since(&to);
519 }
520 Insert(_) => return Err("cannot insert since field".to_string()),
521 Delete(_) => return Err("cannot delete since field".to_string()),
522 }
523 }
524 if !diff_legacy_batches.is_empty() {
525 apply_diffs_spine(metrics, diff_legacy_batches, trace)?;
526 debug_assert_eq!(trace.validate(), Ok(()), "{:?}", trace);
527 }
528 None
529 };
530
531 if !structure_unchanged {
532 let flat = flat.get_or_insert_with(|| trace.flatten());
533 apply_diffs_map(
534 "hollow_batches",
535 diff_hollow_batches,
536 &mut flat.hollow_batches,
537 )?;
538 apply_diffs_map("spine_batches", diff_spine_batches, &mut flat.spine_batches)?;
539 apply_diffs_map("merges", diff_merges, &mut flat.merges)?;
540 }
541
542 if let Some(flat) = flat {
543 *trace = Trace::unflatten(flat)?;
544 }
545
546 Ok(())
552 }
553}
554
555fn diff_field_single<T: PartialEq + Clone>(
556 from: &T,
557 to: &T,
558 diffs: &mut Vec<StateFieldDiff<(), T>>,
559) {
560 if from != to {
563 diffs.push(StateFieldDiff {
564 key: (),
565 val: Update(from.clone(), to.clone()),
566 })
567 }
568}
569
570fn apply_diffs_single_option<X: PartialEq + Debug>(
571 name: &str,
572 diffs: Vec<StateFieldDiff<(), X>>,
573 single: &mut Option<X>,
574) -> Result<(), String> {
575 for diff in diffs {
576 apply_diff_single_option(name, diff, single)?;
577 }
578 Ok(())
579}
580
581fn apply_diff_single_option<X: PartialEq + Debug>(
582 name: &str,
583 diff: StateFieldDiff<(), X>,
584 single: &mut Option<X>,
585) -> Result<(), String> {
586 match diff.val {
587 Update(from, to) => {
588 if single.as_ref() != Some(&from) {
589 return Err(format!(
590 "{} update didn't match: {:?} vs {:?}",
591 name, single, &from
592 ));
593 }
594 *single = Some(to)
595 }
596 Insert(to) => {
597 if single.is_some() {
598 return Err(format!("{} insert found existing value", name));
599 }
600 *single = Some(to)
601 }
602 Delete(from) => {
603 if single.as_ref() != Some(&from) {
604 return Err(format!(
605 "{} delete didn't match: {:?} vs {:?}",
606 name, single, &from
607 ));
608 }
609 *single = None
610 }
611 }
612 Ok(())
613}
614
615fn apply_diffs_single<X: PartialEq + Debug>(
616 name: &str,
617 diffs: Vec<StateFieldDiff<(), X>>,
618 single: &mut X,
619) -> Result<(), String> {
620 for diff in diffs {
621 apply_diff_single(name, diff, single)?;
622 }
623 Ok(())
624}
625
626fn apply_diff_single<X: PartialEq + Debug>(
627 name: &str,
628 diff: StateFieldDiff<(), X>,
629 single: &mut X,
630) -> Result<(), String> {
631 match diff.val {
632 Update(from, to) => {
633 if single != &from {
634 return Err(format!(
635 "{} update didn't match: {:?} vs {:?}",
636 name, single, &from
637 ));
638 }
639 *single = to
640 }
641 Insert(_) => return Err(format!("cannot insert {} field", name)),
642 Delete(_) => return Err(format!("cannot delete {} field", name)),
643 }
644 Ok(())
645}
646
647fn force_apply_diffs_single<X: PartialEq + Debug>(
654 shard_id: &ShardId,
655 seqno: SeqNo,
656 name: &str,
657 diffs: Vec<StateFieldDiff<(), X>>,
658 single: &mut X,
659 metrics: &Metrics,
660) -> Result<(), String> {
661 for diff in diffs {
662 force_apply_diff_single(shard_id, seqno, name, diff, single, metrics)?;
663 }
664 Ok(())
665}
666
667fn force_apply_diff_single<X: PartialEq + Debug>(
668 shard_id: &ShardId,
669 seqno: SeqNo,
670 name: &str,
671 diff: StateFieldDiff<(), X>,
672 single: &mut X,
673 metrics: &Metrics,
674) -> Result<(), String> {
675 match diff.val {
676 Update(from, to) => {
677 if single != &from {
678 debug!(
679 "{}: update didn't match: {:?} vs {:?}, continuing to force apply diff to {:?} for shard {} and seqno {}",
680 name, single, &from, &to, shard_id, seqno
681 );
682 metrics.state.force_apply_hostname.inc();
683 }
684 *single = to
685 }
686 Insert(_) => return Err(format!("cannot insert {} field", name)),
687 Delete(_) => return Err(format!("cannot delete {} field", name)),
688 }
689 Ok(())
690}
691
692fn diff_field_sorted_iter<'a, K, V, IF, IT>(from: IF, to: IT, diffs: &mut Vec<StateFieldDiff<K, V>>)
693where
694 K: Ord + Clone + 'a,
695 V: PartialEq + Clone + 'a,
696 IF: IntoIterator<Item = (&'a K, &'a V)>,
697 IT: IntoIterator<Item = (&'a K, &'a V)>,
698{
699 let (mut from, mut to) = (from.into_iter(), to.into_iter());
700 let (mut f, mut t) = (from.next(), to.next());
701 loop {
702 match (f, t) {
703 (None, None) => break,
704 (Some((fk, fv)), Some((tk, tv))) => match fk.cmp(tk) {
705 Ordering::Less => {
706 diffs.push(StateFieldDiff {
707 key: fk.clone(),
708 val: Delete(fv.clone()),
709 });
710 let f_next = from.next();
711 debug_assert!(f_next.as_ref().map_or(true, |(fk_next, _)| fk_next > &fk));
712 f = f_next;
713 }
714 Ordering::Greater => {
715 diffs.push(StateFieldDiff {
716 key: tk.clone(),
717 val: Insert(tv.clone()),
718 });
719 let t_next = to.next();
720 debug_assert!(t_next.as_ref().map_or(true, |(tk_next, _)| tk_next > &tk));
721 t = t_next;
722 }
723 Ordering::Equal => {
724 if fv != tv {
727 diffs.push(StateFieldDiff {
728 key: fk.clone(),
729 val: Update(fv.clone(), tv.clone()),
730 });
731 }
732 let f_next = from.next();
733 debug_assert!(f_next.as_ref().map_or(true, |(fk_next, _)| fk_next > &fk));
734 f = f_next;
735 let t_next = to.next();
736 debug_assert!(t_next.as_ref().map_or(true, |(tk_next, _)| tk_next > &tk));
737 t = t_next;
738 }
739 },
740 (None, Some((tk, tv))) => {
741 diffs.push(StateFieldDiff {
742 key: tk.clone(),
743 val: Insert(tv.clone()),
744 });
745 let t_next = to.next();
746 debug_assert!(t_next.as_ref().map_or(true, |(tk_next, _)| tk_next > &tk));
747 t = t_next;
748 }
749 (Some((fk, fv)), None) => {
750 diffs.push(StateFieldDiff {
751 key: fk.clone(),
752 val: Delete(fv.clone()),
753 });
754 let f_next = from.next();
755 debug_assert!(f_next.as_ref().map_or(true, |(fk_next, _)| fk_next > &fk));
756 f = f_next;
757 }
758 }
759 }
760}
761
762fn apply_diffs_map<K: Ord, V: PartialEq + Debug>(
763 name: &str,
764 diffs: impl IntoIterator<Item = StateFieldDiff<K, V>>,
765 map: &mut BTreeMap<K, V>,
766) -> Result<(), String> {
767 for diff in diffs {
768 apply_diff_map(name, diff, map)?;
769 }
770 Ok(())
771}
772
773fn apply_diff_map<K: Ord, V: PartialEq + Debug>(
777 name: &str,
778 diff: StateFieldDiff<K, V>,
779 map: &mut BTreeMap<K, V>,
780) -> Result<(), String> {
781 match diff.val {
782 Insert(to) => {
783 let prev = map.insert(diff.key, to);
784 if prev != None {
785 return Err(format!("{} insert found existing value: {:?}", name, prev));
786 }
787 }
788 Update(from, to) => {
789 let prev = map.insert(diff.key, to);
790 if prev.as_ref() != Some(&from) {
791 return Err(format!(
792 "{} update didn't match: {:?} vs {:?}",
793 name,
794 prev,
795 Some(from),
796 ));
797 }
798 }
799 Delete(from) => {
800 let prev = map.remove(&diff.key);
801 if prev.as_ref() != Some(&from) {
802 return Err(format!(
803 "{} delete didn't match: {:?} vs {:?}",
804 name,
805 prev,
806 Some(from),
807 ));
808 }
809 }
810 };
811 Ok(())
812}
813
814fn apply_diffs_spine<T: Timestamp + Lattice + Codec64>(
818 metrics: &Metrics,
819 mut diffs: Vec<StateFieldDiff<HollowBatch<T>, ()>>,
820 trace: &mut Trace<T>,
821) -> Result<(), String> {
822 if let Some(insert) = sniff_insert(&mut diffs, trace.upper()) {
826 let () = trace.push_batch_no_merge_reqs(insert);
829 if diffs.is_empty() {
833 metrics.state.apply_spine_fast_path.inc();
834 return Ok(());
835 }
836 }
837
838 match &diffs[..] {
839 [] => return Ok(()),
841
842 [
846 StateFieldDiff {
847 key: del,
848 val: StateFieldValDiff::Delete(()),
849 },
850 StateFieldDiff {
851 key: ins,
852 val: StateFieldValDiff::Insert(()),
853 },
854 ] => {
855 if del.is_empty()
856 && ins.is_empty()
857 && del.desc.lower() == ins.desc.lower()
858 && PartialOrder::less_than(del.desc.upper(), ins.desc.upper())
859 {
860 let () = trace.push_batch_no_merge_reqs(HollowBatch::empty(Description::new(
863 del.desc.upper().clone(),
864 ins.desc.upper().clone(),
865 Antichain::from_elem(T::minimum()),
869 )));
870 metrics.state.apply_spine_fast_path.inc();
871 return Ok(());
872 }
873 }
874 _ => {}
876 }
877
878 if let Some((_inputs, output)) = sniff_compaction(&diffs) {
880 let res = FueledMergeRes { output };
881 if trace.apply_merge_res_unchecked(&res).applied() {
892 metrics.state.apply_spine_fast_path.inc();
895 return Ok(());
896 }
897
898 let mut batches = Vec::new();
900 trace.map_batches(|b| batches.push(b.clone()));
901
902 match apply_compaction_lenient(metrics, batches, &res.output) {
903 Ok(batches) => {
904 let mut new_trace = Trace::default();
905 new_trace.roundtrip_structure = trace.roundtrip_structure;
906 new_trace.downgrade_since(trace.since());
907 for batch in batches {
908 let () = new_trace.push_batch_no_merge_reqs(batch.clone());
911 }
912 *trace = new_trace;
913 metrics.state.apply_spine_slow_path_lenient.inc();
914 return Ok(());
915 }
916 Err(err) => {
917 return Err(format!(
918 "lenient compaction result apply unexpectedly failed: {}",
919 err
920 ));
921 }
922 }
923 }
924
925 metrics.state.apply_spine_slow_path.inc();
927 debug!(
928 "apply_diffs_spine didn't hit a fast-path diffs={:?} trace={:?}",
929 diffs, trace
930 );
931
932 let batches = {
933 let mut batches = BTreeMap::new();
934 trace.map_batches(|b| assert_none!(batches.insert(b.clone(), ())));
935 apply_diffs_map("spine", diffs.clone(), &mut batches).map(|_ok| batches)
936 };
937
938 let batches = match batches {
939 Ok(batches) => batches,
940 Err(err) => {
941 metrics
942 .state
943 .apply_spine_slow_path_with_reconstruction
944 .inc();
945 debug!(
946 "apply_diffs_spines could not apply diffs directly to existing trace batches: {}. diffs={:?} trace={:?}",
947 err, diffs, trace
948 );
949 let mut reconstructed_spine = Trace::default();
955 reconstructed_spine.roundtrip_structure = trace.roundtrip_structure;
956 trace.map_batches(|b| {
957 let () = reconstructed_spine.push_batch_no_merge_reqs(b.clone());
960 });
961
962 let mut batches = BTreeMap::new();
963 reconstructed_spine.map_batches(|b| assert_none!(batches.insert(b.clone(), ())));
964 apply_diffs_map("spine", diffs, &mut batches)?;
965 batches
966 }
967 };
968
969 let mut new_trace = Trace::default();
970 new_trace.roundtrip_structure = trace.roundtrip_structure;
971 new_trace.downgrade_since(trace.since());
972 for (batch, ()) in batches {
973 let () = new_trace.push_batch_no_merge_reqs(batch);
976 }
977 *trace = new_trace;
978 Ok(())
979}
980
981fn sniff_insert<T: Timestamp + Lattice>(
982 diffs: &mut Vec<StateFieldDiff<HollowBatch<T>, ()>>,
983 upper: &Antichain<T>,
984) -> Option<HollowBatch<T>> {
985 for idx in 0..diffs.len() {
986 match &diffs[idx] {
987 StateFieldDiff {
988 key,
989 val: StateFieldValDiff::Insert(()),
990 } if key.desc.lower() == upper => return Some(diffs.remove(idx).key),
991 _ => continue,
992 }
993 }
994 None
995}
996
997fn sniff_compaction<'a, T: Timestamp + Lattice>(
1000 diffs: &'a [StateFieldDiff<HollowBatch<T>, ()>],
1001) -> Option<(Vec<&'a HollowBatch<T>>, HollowBatch<T>)> {
1002 let mut inserts = diffs.iter().flat_map(|x| match x.val {
1005 StateFieldValDiff::Insert(()) => Some(&x.key),
1006 _ => None,
1007 });
1008 let compaction_output = match inserts.next() {
1009 Some(x) => x,
1010 None => return None,
1011 };
1012 if let Some(_) = inserts.next() {
1013 return None;
1014 }
1015
1016 let mut compaction_inputs = Vec::with_capacity(diffs.len() - 1);
1018 for diff in diffs.iter() {
1019 match diff.val {
1020 StateFieldValDiff::Delete(()) => {
1021 compaction_inputs.push(&diff.key);
1022 }
1023 StateFieldValDiff::Insert(()) => {}
1024 StateFieldValDiff::Update((), ()) => {
1025 return None;
1028 }
1029 }
1030 }
1031
1032 Some((compaction_inputs, compaction_output.clone()))
1033}
1034
1035fn apply_compaction_lenient<'a, T: Timestamp + Lattice>(
1057 metrics: &Metrics,
1058 mut trace: Vec<HollowBatch<T>>,
1059 replacement: &'a HollowBatch<T>,
1060) -> Result<Vec<HollowBatch<T>>, String> {
1061 let mut overlapping_batches = Vec::new();
1062 trace.retain(|b| {
1063 let before_replacement = PartialOrder::less_equal(b.desc.upper(), replacement.desc.lower());
1064 let after_replacement = PartialOrder::less_equal(replacement.desc.upper(), b.desc.lower());
1065 let overlaps_replacement = !(before_replacement || after_replacement);
1066 if overlaps_replacement {
1067 overlapping_batches.push(b.clone());
1068 false
1069 } else {
1070 true
1071 }
1072 });
1073
1074 {
1075 let first_overlapping_batch = match overlapping_batches.first() {
1076 Some(x) => x,
1077 None => return Err("replacement didn't overlap any batches".into()),
1078 };
1079 if PartialOrder::less_than(
1080 first_overlapping_batch.desc.lower(),
1081 replacement.desc.lower(),
1082 ) {
1083 if first_overlapping_batch.len > 0 {
1084 return Err(format!(
1085 "overlapping batch was unexpectedly non-empty: {:?}",
1086 first_overlapping_batch
1087 ));
1088 }
1089 let desc = Description::new(
1090 first_overlapping_batch.desc.lower().clone(),
1091 replacement.desc.lower().clone(),
1092 first_overlapping_batch.desc.since().clone(),
1093 );
1094 trace.push(HollowBatch::empty(desc));
1095 metrics.state.apply_spine_slow_path_lenient_adjustment.inc();
1096 }
1097 }
1098
1099 {
1100 let last_overlapping_batch = match overlapping_batches.last() {
1101 Some(x) => x,
1102 None => return Err("replacement didn't overlap any batches".into()),
1103 };
1104 if PartialOrder::less_than(
1105 replacement.desc.upper(),
1106 last_overlapping_batch.desc.upper(),
1107 ) {
1108 if last_overlapping_batch.len > 0 {
1109 return Err(format!(
1110 "overlapping batch was unexpectedly non-empty: {:?}",
1111 last_overlapping_batch
1112 ));
1113 }
1114 let desc = Description::new(
1115 replacement.desc.upper().clone(),
1116 last_overlapping_batch.desc.upper().clone(),
1117 last_overlapping_batch.desc.since().clone(),
1118 );
1119 trace.push(HollowBatch::empty(desc));
1120 metrics.state.apply_spine_slow_path_lenient_adjustment.inc();
1121 }
1122 }
1123 trace.push(replacement.clone());
1124
1125 trace.sort_by(|a, b| a.desc.lower().elements().cmp(b.desc.lower().elements()));
1127
1128 let mut expected_lower = &Antichain::from_elem(T::minimum());
1130 for b in trace.iter() {
1131 if b.desc.lower() != expected_lower {
1132 return Err(format!(
1133 "lower {:?} did not match expected {:?}: {:?}",
1134 b.desc.lower(),
1135 expected_lower,
1136 trace
1137 ));
1138 }
1139 expected_lower = b.desc.upper();
1140 }
1141 Ok(trace)
1142}
1143
1144#[derive(Debug)]
1160pub struct ProtoStateFieldDiffsWriter {
1161 data_buf: BytesMut,
1162 proto: ProtoStateFieldDiffs,
1163}
1164
1165impl ProtoStateFieldDiffsWriter {
1166 pub fn push_field(&mut self, field: ProtoStateField) {
1168 self.proto.fields.push(i32::from(field));
1169 }
1170
1171 pub fn push_diff_type(&mut self, diff_type: ProtoStateFieldDiffType) {
1173 self.proto.diff_types.push(i32::from(diff_type));
1174 }
1175
1176 pub fn encode_proto<M: prost::Message>(&mut self, msg: &M) {
1178 let len_before = self.data_buf.len();
1179 self.data_buf.reserve(msg.encoded_len());
1180
1181 msg.encode_raw(&mut self.data_buf);
1186
1187 let written_len = self.data_buf.len() - len_before;
1189 self.proto.data_lens.push(u64::cast_from(written_len));
1190 }
1191
1192 pub fn into_proto(self) -> ProtoStateFieldDiffs {
1193 let ProtoStateFieldDiffsWriter {
1194 data_buf,
1195 mut proto,
1196 } = self;
1197
1198 assert!(proto.data_bytes.is_empty());
1200
1201 let data_bytes = data_buf.freeze();
1203 proto.data_bytes = data_bytes;
1204
1205 proto
1206 }
1207}
1208
1209impl ProtoStateFieldDiffs {
1210 pub fn into_writer(mut self) -> ProtoStateFieldDiffsWriter {
1211 let mut data_buf = BytesMut::with_capacity(self.data_bytes.len());
1213
1214 let existing_data = std::mem::take(&mut self.data_bytes);
1216 data_buf.extend(existing_data);
1217
1218 ProtoStateFieldDiffsWriter {
1219 data_buf,
1220 proto: self,
1221 }
1222 }
1223
1224 pub fn iter<'a>(&'a self) -> ProtoStateFieldDiffsIter<'a> {
1225 let len = self.fields.len();
1226 assert_eq!(self.diff_types.len(), len);
1227
1228 ProtoStateFieldDiffsIter {
1229 len,
1230 diff_idx: 0,
1231 data_idx: 0,
1232 data_offset: 0,
1233 diffs: self,
1234 }
1235 }
1236
1237 pub fn validate(&self) -> Result<(), String> {
1238 if self.fields.len() != self.diff_types.len() {
1239 return Err(format!(
1240 "fields {} and diff_types {} lengths disagree",
1241 self.fields.len(),
1242 self.diff_types.len()
1243 ));
1244 }
1245
1246 let mut expected_data_slices = 0;
1247 for diff_type in self.diff_types.iter() {
1248 expected_data_slices += 1;
1250 match ProtoStateFieldDiffType::try_from(*diff_type) {
1252 Ok(ProtoStateFieldDiffType::Insert) => expected_data_slices += 1,
1253 Ok(ProtoStateFieldDiffType::Update) => expected_data_slices += 2,
1254 Ok(ProtoStateFieldDiffType::Delete) => expected_data_slices += 1,
1255 Err(_) => return Err(format!("unknown diff_type {}", diff_type)),
1256 }
1257 }
1258 if expected_data_slices != self.data_lens.len() {
1259 return Err(format!(
1260 "expected {} data slices got {}",
1261 expected_data_slices,
1262 self.data_lens.len()
1263 ));
1264 }
1265
1266 let expected_data_bytes = usize::cast_from(self.data_lens.iter().copied().sum::<u64>());
1267 if expected_data_bytes != self.data_bytes.len() {
1268 return Err(format!(
1269 "expected {} data bytes got {}",
1270 expected_data_bytes,
1271 self.data_bytes.len()
1272 ));
1273 }
1274
1275 Ok(())
1276 }
1277}
1278
1279#[derive(Debug)]
1280pub struct ProtoStateFieldDiff<'a> {
1281 pub key: &'a [u8],
1282 pub diff_type: ProtoStateFieldDiffType,
1283 pub from: &'a [u8],
1284 pub to: &'a [u8],
1285}
1286
1287pub struct ProtoStateFieldDiffsIter<'a> {
1288 len: usize,
1289 diff_idx: usize,
1290 data_idx: usize,
1291 data_offset: usize,
1292 diffs: &'a ProtoStateFieldDiffs,
1293}
1294
1295impl<'a> Iterator for ProtoStateFieldDiffsIter<'a> {
1296 type Item = Result<(ProtoStateField, ProtoStateFieldDiff<'a>), TryFromProtoError>;
1297
1298 fn next(&mut self) -> Option<Self::Item> {
1299 if self.diff_idx >= self.len {
1300 return None;
1301 }
1302 let mut next_data = || {
1303 let start = self.data_offset;
1304 let end = start + usize::cast_from(self.diffs.data_lens[self.data_idx]);
1305 let data = &self.diffs.data_bytes[start..end];
1306 self.data_idx += 1;
1307 self.data_offset = end;
1308 data
1309 };
1310 let field = match ProtoStateField::try_from(self.diffs.fields[self.diff_idx]) {
1311 Ok(x) => x,
1312 Err(_) => {
1313 return Some(Err(TryFromProtoError::unknown_enum_variant(format!(
1314 "ProtoStateField({})",
1315 self.diffs.fields[self.diff_idx]
1316 ))));
1317 }
1318 };
1319 let diff_type =
1320 match ProtoStateFieldDiffType::try_from(self.diffs.diff_types[self.diff_idx]) {
1321 Ok(x) => x,
1322 Err(_) => {
1323 return Some(Err(TryFromProtoError::unknown_enum_variant(format!(
1324 "ProtoStateFieldDiffType({})",
1325 self.diffs.diff_types[self.diff_idx]
1326 ))));
1327 }
1328 };
1329 let key = next_data();
1330 let (from, to): (&[u8], &[u8]) = match diff_type {
1331 ProtoStateFieldDiffType::Insert => (&[], next_data()),
1332 ProtoStateFieldDiffType::Update => (next_data(), next_data()),
1333 ProtoStateFieldDiffType::Delete => (next_data(), &[]),
1334 };
1335 let diff = ProtoStateFieldDiff {
1336 key,
1337 diff_type,
1338 from,
1339 to,
1340 };
1341 self.diff_idx += 1;
1342 Some(Ok((field, diff)))
1343 }
1344}
1345
1346#[cfg(test)]
1347mod tests {
1348 use semver::Version;
1349 use std::ops::ControlFlow::Continue;
1350
1351 use crate::internal::paths::{PartId, PartialBatchKey, RollupId, WriterKey};
1352 use mz_ore::metrics::MetricsRegistry;
1353
1354 use crate::ShardId;
1355 use crate::internal::state::TypedState;
1356
1357 use super::*;
1358
1359 #[mz_ore::test]
1362 #[cfg_attr(miri, ignore)] fn test_state_sync() {
1364 use proptest::prelude::*;
1365
1366 #[derive(Debug, Clone)]
1367 enum Action {
1368 Append { empty: bool, time_delta: u64 },
1370 Compact { req: usize },
1372 }
1373
1374 let action_gen: BoxedStrategy<Action> = {
1375 prop::strategy::Union::new([
1376 (any::<bool>(), 1u64..10u64)
1377 .prop_map(|(empty, time_delta)| Action::Append { empty, time_delta })
1378 .boxed(),
1379 (0usize..10usize)
1380 .prop_map(|req| Action::Compact { req })
1381 .boxed(),
1382 ])
1383 .boxed()
1384 };
1385
1386 fn run(actions: Vec<(Action, bool)>, metrics: &Metrics) {
1387 let version = Version::new(0, 100, 0);
1388 let writer_key = WriterKey::Version(version.to_string());
1389 let id = ShardId::new();
1390 let hostname = "computer";
1391 let typed: TypedState<String, (), u64, i64> =
1392 TypedState::new(version, id, hostname.to_string(), 0);
1393 let mut leader = typed.state;
1394
1395 let seqno = SeqNo::minimum();
1396 let mut lower = 0u64;
1397 let mut merge_reqs = vec![];
1398
1399 leader.collections.rollups.insert(
1400 seqno,
1401 HollowRollup {
1402 key: PartialRollupKey::new(seqno, &RollupId::new()),
1403 encoded_size_bytes: None,
1404 },
1405 );
1406 leader.collections.trace.roundtrip_structure = false;
1407 let mut follower = leader.clone();
1408
1409 for (action, roundtrip_structure) in actions {
1410 let mut old_leader = leader.clone();
1412 match action {
1413 Action::Append { empty, time_delta } => {
1414 let upper = lower + time_delta;
1415 let key = if empty {
1416 None
1417 } else {
1418 let id = PartId::new();
1419 Some(PartialBatchKey::new(&writer_key, &id))
1420 };
1421
1422 let keys = key.as_ref().map(|k| k.0.as_str());
1423 let reqs = leader.collections.trace.push_batch(
1424 crate::internal::state::tests::hollow(
1425 lower,
1426 upper,
1427 keys.as_slice(),
1428 if empty { 0 } else { 1 },
1429 ),
1430 );
1431 merge_reqs.extend(reqs);
1432 lower = upper;
1433 }
1434 Action::Compact { req } => {
1435 if !merge_reqs.is_empty() {
1436 let req = merge_reqs.remove(req.min(merge_reqs.len() - 1));
1437 let len = req.inputs.iter().map(|p| p.batch.len).sum();
1438 let parts = req
1439 .inputs
1440 .into_iter()
1441 .flat_map(|p| p.batch.parts.clone())
1442 .collect();
1443 let output = HollowBatch::new_run(req.desc, parts, len);
1444 leader
1445 .collections
1446 .trace
1447 .apply_merge_res_unchecked(&FueledMergeRes { output });
1448 }
1449 }
1450 }
1451 leader.collections.trace.roundtrip_structure = roundtrip_structure;
1452 leader.seqno.0 += 1;
1453 let diff = StateDiff::from_diff(&old_leader, &leader);
1454
1455 old_leader
1458 .apply_diff(metrics, diff.clone())
1459 .expect("diff applies to the old version of the leader state");
1460 follower
1461 .apply_diff(metrics, diff.clone())
1462 .expect("diff applies to the synced version of the follower state");
1463
1464 }
1467 }
1468
1469 let config = PersistConfig::new_for_tests();
1470 let metrics_registry = MetricsRegistry::new();
1471 let metrics: Metrics = Metrics::new(&config, &metrics_registry);
1472
1473 proptest!(|(actions in prop::collection::vec((action_gen, any::<bool>()), 1..20))| {
1474 run(actions, &metrics)
1475 })
1476 }
1477
1478 #[mz_ore::test]
1482 fn regression_15493_sniff_insert() {
1483 fn hb(lower: u64, upper: u64, len: usize) -> HollowBatch<u64> {
1484 HollowBatch::new_run(
1485 Description::new(
1486 Antichain::from_elem(lower),
1487 Antichain::from_elem(upper),
1488 Antichain::from_elem(0),
1489 ),
1490 Vec::new(),
1491 len,
1492 )
1493 }
1494
1495 let batches_before = vec![hb(0, 7094664, 0), hb(7094664, 7185234, 100)];
1512
1513 let diffs = vec![
1514 StateFieldDiff {
1515 key: hb(0, 6805359, 0),
1516 val: StateFieldValDiff::Delete(()),
1517 },
1518 StateFieldDiff {
1519 key: hb(6805359, 7083793, 0),
1520 val: StateFieldValDiff::Delete(()),
1521 },
1522 StateFieldDiff {
1523 key: hb(0, 7083793, 0),
1524 val: StateFieldValDiff::Insert(()),
1525 },
1526 StateFieldDiff {
1527 key: hb(7185234, 7185859, 20),
1528 val: StateFieldValDiff::Insert(()),
1529 },
1530 ];
1531
1532 let batches_after = vec![
1539 hb(0, 7094664, 0),
1540 hb(7094664, 7185234, 100),
1541 hb(7185234, 7185859, 20),
1542 ];
1543
1544 let cfg = PersistConfig::new_for_tests();
1545 let state = TypedState::<(), (), u64, i64>::new(
1546 cfg.build_version.clone(),
1547 ShardId::new(),
1548 cfg.hostname.clone(),
1549 (cfg.now)(),
1550 );
1551 let state = state.clone_apply(&cfg, &mut |_seqno, _cfg, state| {
1552 for b in batches_before.iter() {
1553 let _merge_reqs = state.trace.push_batch(b.clone());
1554 }
1555 Continue::<(), ()>(())
1556 });
1557 let mut state = match state {
1558 Continue((_, x)) => x,
1559 _ => unreachable!(),
1560 };
1561
1562 let metrics = Metrics::new(&PersistConfig::new_for_tests(), &MetricsRegistry::new());
1563 assert_eq!(
1564 apply_diffs_spine(&metrics, diffs, &mut state.collections.trace),
1565 Ok(())
1566 );
1567
1568 let mut actual = Vec::new();
1569 state
1570 .collections
1571 .trace
1572 .map_batches(|b| actual.push(b.clone()));
1573 assert_eq!(actual, batches_after);
1574 }
1575
1576 #[mz_ore::test]
1577 #[cfg_attr(miri, ignore)] fn apply_lenient() {
1579 #[track_caller]
1580 fn testcase(
1581 replacement: (u64, u64, u64, usize),
1582 spine: &[(u64, u64, u64, usize)],
1583 expected: Result<&[(u64, u64, u64, usize)], &str>,
1584 ) {
1585 fn batch(x: &(u64, u64, u64, usize)) -> HollowBatch<u64> {
1586 let (lower, upper, since, len) = x;
1587 let desc = Description::new(
1588 Antichain::from_elem(*lower),
1589 Antichain::from_elem(*upper),
1590 Antichain::from_elem(*since),
1591 );
1592 HollowBatch::new_run(desc, Vec::new(), *len)
1593 }
1594 let replacement = batch(&replacement);
1595 let batches = spine.iter().map(batch).collect::<Vec<_>>();
1596
1597 let metrics = Metrics::new(&PersistConfig::new_for_tests(), &MetricsRegistry::new());
1598 let actual = apply_compaction_lenient(&metrics, batches, &replacement);
1599 let expected = match expected {
1600 Ok(batches) => Ok(batches.iter().map(batch).collect::<Vec<_>>()),
1601 Err(err) => Err(err.to_owned()),
1602 };
1603 assert_eq!(actual, expected);
1604 }
1605
1606 testcase(
1608 (0, 3, 0, 100),
1609 &[(0, 1, 0, 0), (1, 2, 0, 0), (2, 3, 0, 0)],
1610 Ok(&[(0, 3, 0, 100)]),
1611 );
1612
1613 testcase(
1615 (1, 2, 0, 100),
1616 &[(0, 3, 0, 0)],
1617 Ok(&[(0, 1, 0, 0), (1, 2, 0, 100), (2, 3, 0, 0)]),
1618 );
1619
1620 testcase(
1622 (2, 4, 0, 100),
1623 &[(0, 3, 0, 0), (3, 4, 0, 0)],
1624 Ok(&[(0, 2, 0, 0), (2, 4, 0, 100)]),
1625 );
1626
1627 testcase(
1629 (2, 4, 0, 100),
1630 &[(0, 3, 0, 1), (3, 4, 0, 0)],
1631 Err(
1632 "overlapping batch was unexpectedly non-empty: HollowBatch { desc: ([0], [3], [0]), parts: [], len: 1, runs: [], run_meta: [] }",
1633 ),
1634 );
1635
1636 testcase(
1638 (2, 4, 0, 100),
1639 &[(0, 1, 0, 0), (1, 3, 0, 0), (3, 4, 0, 0)],
1640 Ok(&[(0, 1, 0, 0), (1, 2, 0, 0), (2, 4, 0, 100)]),
1641 );
1642
1643 testcase(
1645 (2, 4, 0, 100),
1646 &[(0, 3, 200, 0), (3, 4, 0, 0)],
1647 Ok(&[(0, 2, 200, 0), (2, 4, 0, 100)]),
1648 );
1649
1650 testcase(
1652 (0, 2, 0, 100),
1653 &[(0, 1, 0, 0), (1, 4, 0, 0)],
1654 Ok(&[(0, 2, 0, 100), (2, 4, 0, 0)]),
1655 );
1656
1657 testcase(
1659 (0, 2, 0, 100),
1660 &[(0, 1, 0, 0), (1, 4, 0, 1)],
1661 Err(
1662 "overlapping batch was unexpectedly non-empty: HollowBatch { desc: ([1], [4], [0]), parts: [], len: 1, runs: [], run_meta: [] }",
1663 ),
1664 );
1665
1666 testcase(
1668 (0, 2, 0, 100),
1669 &[(0, 1, 0, 0), (1, 3, 0, 0), (3, 4, 0, 0)],
1670 Ok(&[(0, 2, 0, 100), (2, 3, 0, 0), (3, 4, 0, 0)]),
1671 );
1672
1673 testcase(
1675 (0, 2, 0, 100),
1676 &[(0, 1, 0, 0), (1, 4, 200, 0)],
1677 Ok(&[(0, 2, 0, 100), (2, 4, 200, 0)]),
1678 );
1679
1680 testcase(
1682 (2, 6, 0, 100),
1683 &[(0, 3, 0, 0), (3, 5, 0, 0), (5, 8, 0, 0)],
1684 Ok(&[(0, 2, 0, 0), (2, 6, 0, 100), (6, 8, 0, 0)]),
1685 );
1686
1687 testcase(
1689 (2, 3, 0, 100),
1690 &[(0, 1, 0, 0)],
1691 Err("replacement didn't overlap any batches"),
1692 );
1693
1694 testcase(
1696 (2, 3, 0, 100),
1697 &[(4, 5, 0, 0)],
1698 Err("replacement didn't overlap any batches"),
1699 );
1700 }
1701}