1use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::fmt::Debug;
13use std::sync::Arc;
14
15use bytes::{Bytes, BytesMut};
16use differential_dataflow::lattice::Lattice;
17use differential_dataflow::trace::Description;
18use mz_ore::assert_none;
19use mz_ore::cast::CastFrom;
20use mz_persist::location::{SeqNo, VersionedData};
21use mz_persist_types::Codec64;
22use mz_persist_types::schema::SchemaId;
23use mz_proto::TryFromProtoError;
24use timely::PartialOrder;
25use timely::progress::{Antichain, Timestamp};
26use tracing::debug;
27
28use crate::critical::CriticalReaderId;
29use crate::internal::paths::PartialRollupKey;
30use crate::internal::state::{
31 CriticalReaderState, EncodedSchemas, HollowBatch, HollowBlobRef, HollowRollup,
32 LeasedReaderState, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs, RunPart,
33 State, StateCollections, WriterState,
34};
35use crate::internal::trace::CompactionInput;
36use crate::internal::trace::{FueledMergeRes, SpineId, ThinMerge, ThinSpineBatch, Trace};
37use crate::read::LeasedReaderId;
38use crate::write::WriterId;
39use crate::{Metrics, PersistConfig, ShardId};
40
41use StateFieldValDiff::*;
42
43use super::state::{ActiveGc, ActiveRollup};
44
45#[derive(Clone, Debug)]
46#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
47pub enum StateFieldValDiff<V> {
48 Insert(V),
49 Update(V, V),
50 Delete(V),
51}
52
53#[derive(Clone)]
54#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
55pub struct StateFieldDiff<K, V> {
56 pub key: K,
57 pub val: StateFieldValDiff<V>,
58}
59
60impl<K: Debug, V: Debug> std::fmt::Debug for StateFieldDiff<K, V> {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 f.debug_struct("StateFieldDiff")
63 .field("val", &self.val)
66 .field("key", &self.key)
67 .finish()
68 }
69}
70
71#[derive(Debug)]
72#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
73pub struct StateDiff<T> {
74 pub(crate) applier_version: semver::Version,
75 pub(crate) seqno_from: SeqNo,
76 pub(crate) seqno_to: SeqNo,
77 pub(crate) walltime_ms: u64,
78 pub(crate) latest_rollup_key: PartialRollupKey,
79 pub(crate) rollups: Vec<StateFieldDiff<SeqNo, HollowRollup>>,
80 pub(crate) active_rollup: Vec<StateFieldDiff<(), ActiveRollup>>,
81 pub(crate) active_gc: Vec<StateFieldDiff<(), ActiveGc>>,
82 pub(crate) hostname: Vec<StateFieldDiff<(), String>>,
83 pub(crate) last_gc_req: Vec<StateFieldDiff<(), SeqNo>>,
84 pub(crate) leased_readers: Vec<StateFieldDiff<LeasedReaderId, LeasedReaderState<T>>>,
85 pub(crate) critical_readers: Vec<StateFieldDiff<CriticalReaderId, CriticalReaderState<T>>>,
86 pub(crate) writers: Vec<StateFieldDiff<WriterId, WriterState<T>>>,
87 pub(crate) schemas: Vec<StateFieldDiff<SchemaId, EncodedSchemas>>,
88 pub(crate) since: Vec<StateFieldDiff<(), Antichain<T>>>,
89 pub(crate) legacy_batches: Vec<StateFieldDiff<HollowBatch<T>, ()>>,
90 pub(crate) hollow_batches: Vec<StateFieldDiff<SpineId, Arc<HollowBatch<T>>>>,
91 pub(crate) spine_batches: Vec<StateFieldDiff<SpineId, ThinSpineBatch<T>>>,
92 pub(crate) merges: Vec<StateFieldDiff<SpineId, ThinMerge<T>>>,
93}
94
95impl<T: Timestamp + Codec64> StateDiff<T> {
96 pub fn new(
97 applier_version: semver::Version,
98 seqno_from: SeqNo,
99 seqno_to: SeqNo,
100 walltime_ms: u64,
101 latest_rollup_key: PartialRollupKey,
102 ) -> Self {
103 StateDiff {
104 applier_version,
105 seqno_from,
106 seqno_to,
107 walltime_ms,
108 latest_rollup_key,
109 rollups: Vec::default(),
110 active_rollup: Vec::default(),
111 active_gc: Vec::default(),
112 hostname: Vec::default(),
113 last_gc_req: Vec::default(),
114 leased_readers: Vec::default(),
115 critical_readers: Vec::default(),
116 writers: Vec::default(),
117 schemas: Vec::default(),
118 since: Vec::default(),
119 legacy_batches: Vec::default(),
120 hollow_batches: Vec::default(),
121 spine_batches: Vec::default(),
122 merges: Vec::default(),
123 }
124 }
125
126 pub fn referenced_batches(&self) -> impl Iterator<Item = StateFieldValDiff<&HollowBatch<T>>> {
127 let legacy_batches = self
128 .legacy_batches
129 .iter()
130 .filter_map(|diff| match diff.val {
131 Insert(()) => Some(Insert(&diff.key)),
132 Update((), ()) => None, Delete(()) => Some(Delete(&diff.key)),
134 });
135 let hollow_batches = self.hollow_batches.iter().map(|diff| match &diff.val {
136 Insert(batch) => Insert(&**batch),
137 Update(before, after) => Update(&**before, &**after),
138 Delete(batch) => Delete(&**batch),
139 });
140 legacy_batches.chain(hollow_batches)
141 }
142}
143
144impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
145 pub fn from_diff(from: &State<T>, to: &State<T>) -> Self {
146 let State {
149 applier_version: _,
150 shard_id: from_shard_id,
151 seqno: from_seqno,
152 hostname: from_hostname,
153 walltime_ms: _, collections:
155 StateCollections {
156 last_gc_req: from_last_gc_req,
157 rollups: from_rollups,
158 active_rollup: from_active_rollup,
159 active_gc: from_active_gc,
160 leased_readers: from_leased_readers,
161 critical_readers: from_critical_readers,
162 writers: from_writers,
163 schemas: from_schemas,
164 trace: from_trace,
165 },
166 } = from;
167 let State {
168 applier_version: to_applier_version,
169 shard_id: to_shard_id,
170 seqno: to_seqno,
171 walltime_ms: to_walltime_ms,
172 hostname: to_hostname,
173 collections:
174 StateCollections {
175 last_gc_req: to_last_gc_req,
176 rollups: to_rollups,
177 active_rollup: to_active_rollup,
178 active_gc: to_active_gc,
179 leased_readers: to_leased_readers,
180 critical_readers: to_critical_readers,
181 writers: to_writers,
182 schemas: to_schemas,
183 trace: to_trace,
184 },
185 } = to;
186 assert_eq!(from_shard_id, to_shard_id);
187
188 let (_, latest_rollup) = to.latest_rollup();
189 let mut diffs = Self::new(
190 to_applier_version.clone(),
191 *from_seqno,
192 *to_seqno,
193 *to_walltime_ms,
194 latest_rollup.key.clone(),
195 );
196 diff_field_single(from_hostname, to_hostname, &mut diffs.hostname);
197 diff_field_single(from_last_gc_req, to_last_gc_req, &mut diffs.last_gc_req);
198 diff_field_sorted_iter(
199 from_active_rollup.iter().map(|r| (&(), r)),
200 to_active_rollup.iter().map(|r| (&(), r)),
201 &mut diffs.active_rollup,
202 );
203 diff_field_sorted_iter(
204 from_active_gc.iter().map(|g| (&(), g)),
205 to_active_gc.iter().map(|g| (&(), g)),
206 &mut diffs.active_gc,
207 );
208 diff_field_sorted_iter(from_rollups.iter(), to_rollups, &mut diffs.rollups);
209 diff_field_sorted_iter(
210 from_leased_readers.iter(),
211 to_leased_readers,
212 &mut diffs.leased_readers,
213 );
214 diff_field_sorted_iter(
215 from_critical_readers.iter(),
216 to_critical_readers,
217 &mut diffs.critical_readers,
218 );
219 diff_field_sorted_iter(from_writers.iter(), to_writers, &mut diffs.writers);
220 diff_field_sorted_iter(from_schemas.iter(), to_schemas, &mut diffs.schemas);
221 diff_field_single(from_trace.since(), to_trace.since(), &mut diffs.since);
222
223 let from_flat = from_trace.flatten();
224 let to_flat = to_trace.flatten();
225 diff_field_sorted_iter(
226 from_flat.legacy_batches.iter().map(|(k, v)| (&**k, v)),
227 to_flat.legacy_batches.iter().map(|(k, v)| (&**k, v)),
228 &mut diffs.legacy_batches,
229 );
230 diff_field_sorted_iter(
231 from_flat.hollow_batches.iter(),
232 to_flat.hollow_batches.iter(),
233 &mut diffs.hollow_batches,
234 );
235 diff_field_sorted_iter(
236 from_flat.spine_batches.iter(),
237 to_flat.spine_batches.iter(),
238 &mut diffs.spine_batches,
239 );
240 diff_field_sorted_iter(
241 from_flat.merges.iter(),
242 to_flat.merges.iter(),
243 &mut diffs.merges,
244 );
245 diffs
246 }
247
248 pub(crate) fn blob_inserts(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
249 let batches = self
250 .referenced_batches()
251 .filter_map(|spine_diff| match spine_diff {
252 Insert(b) | Update(_, b) => Some(HollowBlobRef::Batch(b)),
253 Delete(_) => None, });
255 let rollups = self
256 .rollups
257 .iter()
258 .filter_map(|rollups_diff| match &rollups_diff.val {
259 StateFieldValDiff::Insert(x) | StateFieldValDiff::Update(_, x) => {
260 Some(HollowBlobRef::Rollup(x))
261 }
262 StateFieldValDiff::Delete(_) => None, });
264 batches.chain(rollups)
265 }
266
267 pub(crate) fn part_deletes(&self) -> impl Iterator<Item = &RunPart<T>> {
268 let removed = self
274 .referenced_batches()
275 .filter_map(|spine_diff| match spine_diff {
276 Insert(_) => None,
277 Update(a, _) | Delete(a) => Some(a.parts.iter().collect::<Vec<_>>()),
278 });
279
280 let added: std::collections::BTreeSet<_> = self
281 .referenced_batches()
282 .filter_map(|spine_diff| match spine_diff {
283 Insert(a) | Update(_, a) => Some(a.parts.iter().collect::<Vec<_>>()),
284 Delete(_) => None,
285 })
286 .flatten()
287 .collect();
288
289 removed
290 .into_iter()
291 .flat_map(|x| x)
292 .filter(move |part| !added.contains(part))
293 }
294
295 pub(crate) fn rollup_deletes(&self) -> impl Iterator<Item = &HollowRollup> {
296 self.rollups
297 .iter()
298 .filter_map(|rollups_diff| match &rollups_diff.val {
299 Insert(_) => None,
300 Update(a, _) | Delete(a) => Some(a),
301 })
302 }
303
304 #[cfg(any(test, debug_assertions))]
305 #[allow(dead_code)]
306 pub fn validate_roundtrip<K, V, D>(
307 metrics: &Metrics,
308 from_state: &crate::internal::state::TypedState<K, V, T, D>,
309 diff: &Self,
310 to_state: &crate::internal::state::TypedState<K, V, T, D>,
311 ) -> Result<(), String>
312 where
313 K: mz_persist_types::Codec + std::fmt::Debug,
314 V: mz_persist_types::Codec + std::fmt::Debug,
315 D: differential_dataflow::difference::Semigroup + Codec64,
316 {
317 use mz_proto::RustType;
318 use prost::Message;
319
320 use crate::internal::state::ProtoStateDiff;
321
322 let mut roundtrip_state = from_state.clone(
323 from_state.applier_version.clone(),
324 from_state.hostname.clone(),
325 );
326 roundtrip_state.apply_diff(metrics, diff.clone())?;
327
328 if &roundtrip_state != to_state {
329 return Err(format!(
332 "state didn't roundtrip\n from_state {:?}\n to_state {:?}\n rt_state {:?}\n diff {:?}\n",
333 from_state, to_state, roundtrip_state, diff
334 ));
335 }
336
337 let encoded_diff = diff.into_proto().encode_to_vec();
338 let roundtrip_diff = Self::from_proto(
339 ProtoStateDiff::decode(encoded_diff.as_slice()).map_err(|err| err.to_string())?,
340 )
341 .map_err(|err| err.to_string())?;
342
343 if &roundtrip_diff != diff {
344 return Err(format!(
347 "diff didn't roundtrip\n diff {:?}\n rt_diff {:?}",
348 diff, roundtrip_diff
349 ));
350 }
351
352 Ok(())
353 }
354}
355
356impl<T: Timestamp + Lattice + Codec64> State<T> {
357 pub fn apply_encoded_diffs<'a, I: IntoIterator<Item = &'a VersionedData>>(
358 &mut self,
359 cfg: &PersistConfig,
360 metrics: &Metrics,
361 diffs: I,
362 ) {
363 let mut state_seqno = self.seqno;
364 let diffs = diffs.into_iter().filter_map(move |x| {
365 if x.seqno != state_seqno.next() {
366 return None;
368 }
369 let data = x.data.clone();
370 let diff = metrics
371 .codecs
372 .state_diff
373 .decode(|| StateDiff::decode(&cfg.build_version, x.data.clone()));
375 assert_eq!(diff.seqno_from, state_seqno);
376 state_seqno = diff.seqno_to;
377 Some((diff, data))
378 });
379 self.apply_diffs(metrics, diffs);
380 }
381}
382
383impl<T: Timestamp + Lattice + Codec64> State<T> {
384 pub fn apply_diffs<I: IntoIterator<Item = (StateDiff<T>, Bytes)>>(
385 &mut self,
386 metrics: &Metrics,
387 diffs: I,
388 ) {
389 for (diff, data) in diffs {
390 match self.apply_diff(metrics, diff) {
394 Ok(()) => {}
395 Err(err) => {
396 let diff = StateDiff::<T>::decode(&self.applier_version, data);
401 panic!(
402 "state diff should apply cleanly: {} diff {:?} state {:?}",
403 err, diff, self
404 )
405 }
406 }
407 }
408 }
409
410 pub(super) fn apply_diff(
413 &mut self,
414 metrics: &Metrics,
415 diff: StateDiff<T>,
416 ) -> Result<(), String> {
417 let StateDiff {
419 applier_version: diff_applier_version,
420 seqno_from: diff_seqno_from,
421 seqno_to: diff_seqno_to,
422 walltime_ms: diff_walltime_ms,
423 latest_rollup_key: _,
424 rollups: diff_rollups,
425 active_rollup: diff_active_rollup,
426 active_gc: diff_active_gc,
427 hostname: diff_hostname,
428 last_gc_req: diff_last_gc_req,
429 leased_readers: diff_leased_readers,
430 critical_readers: diff_critical_readers,
431 writers: diff_writers,
432 schemas: diff_schemas,
433 since: diff_since,
434 legacy_batches: diff_legacy_batches,
435 hollow_batches: diff_hollow_batches,
436 spine_batches: diff_spine_batches,
437 merges: diff_merges,
438 } = diff;
439 if self.seqno == diff_seqno_to {
440 return Ok(());
441 }
442 if self.seqno != diff_seqno_from {
443 return Err(format!(
444 "could not apply diff {} -> {} to state {}",
445 diff_seqno_from, diff_seqno_to, self.seqno
446 ));
447 }
448 self.seqno = diff_seqno_to;
449 self.applier_version = diff_applier_version;
450 self.walltime_ms = diff_walltime_ms;
451 force_apply_diffs_single(
452 &self.shard_id,
453 diff_seqno_to,
454 "hostname",
455 diff_hostname,
456 &mut self.hostname,
457 metrics,
458 )?;
459
460 let StateCollections {
463 last_gc_req,
464 rollups,
465 active_rollup,
466 active_gc,
467 leased_readers,
468 critical_readers,
469 writers,
470 schemas,
471 trace,
472 } = &mut self.collections;
473
474 apply_diffs_map("rollups", diff_rollups, rollups)?;
475 apply_diffs_single("last_gc_req", diff_last_gc_req, last_gc_req)?;
476 apply_diffs_single_option("active_rollup", diff_active_rollup, active_rollup)?;
477 apply_diffs_single_option("active_gc", diff_active_gc, active_gc)?;
478 apply_diffs_map("leased_readers", diff_leased_readers, leased_readers)?;
479 apply_diffs_map("critical_readers", diff_critical_readers, critical_readers)?;
480 apply_diffs_map("writers", diff_writers, writers)?;
481 apply_diffs_map("schemas", diff_schemas, schemas)?;
482
483 let structure_unchanged = diff_hollow_batches.is_empty()
484 && diff_spine_batches.is_empty()
485 && diff_merges.is_empty();
486 let spine_unchanged =
487 diff_since.is_empty() && diff_legacy_batches.is_empty() && structure_unchanged;
488
489 if spine_unchanged {
490 return Ok(());
491 }
492
493 let mut flat = if trace.roundtrip_structure {
494 metrics.state.apply_spine_flattened.inc();
495 let mut flat = trace.flatten();
496 apply_diffs_single("since", diff_since, &mut flat.since)?;
497 apply_diffs_map(
498 "legacy_batches",
499 diff_legacy_batches
500 .into_iter()
501 .map(|StateFieldDiff { key, val }| StateFieldDiff {
502 key: Arc::new(key),
503 val,
504 }),
505 &mut flat.legacy_batches,
506 )?;
507 Some(flat)
508 } else {
509 for x in diff_since {
510 match x.val {
511 Update(from, to) => {
512 if trace.since() != &from {
513 return Err(format!(
514 "since update didn't match: {:?} vs {:?}",
515 self.collections.trace.since(),
516 &from
517 ));
518 }
519 trace.downgrade_since(&to);
520 }
521 Insert(_) => return Err("cannot insert since field".to_string()),
522 Delete(_) => return Err("cannot delete since field".to_string()),
523 }
524 }
525 if !diff_legacy_batches.is_empty() {
526 apply_diffs_spine(metrics, diff_legacy_batches, trace)?;
527 debug_assert_eq!(trace.validate(), Ok(()), "{:?}", trace);
528 }
529 None
530 };
531
532 if !structure_unchanged {
533 let flat = flat.get_or_insert_with(|| trace.flatten());
534 apply_diffs_map(
535 "hollow_batches",
536 diff_hollow_batches,
537 &mut flat.hollow_batches,
538 )?;
539 apply_diffs_map("spine_batches", diff_spine_batches, &mut flat.spine_batches)?;
540 apply_diffs_map("merges", diff_merges, &mut flat.merges)?;
541 }
542
543 if let Some(flat) = flat {
544 *trace = Trace::unflatten(flat)?;
545 }
546
547 Ok(())
553 }
554}
555
556fn diff_field_single<T: PartialEq + Clone>(
557 from: &T,
558 to: &T,
559 diffs: &mut Vec<StateFieldDiff<(), T>>,
560) {
561 if from != to {
564 diffs.push(StateFieldDiff {
565 key: (),
566 val: Update(from.clone(), to.clone()),
567 })
568 }
569}
570
571fn apply_diffs_single_option<X: PartialEq + Debug>(
572 name: &str,
573 diffs: Vec<StateFieldDiff<(), X>>,
574 single: &mut Option<X>,
575) -> Result<(), String> {
576 for diff in diffs {
577 apply_diff_single_option(name, diff, single)?;
578 }
579 Ok(())
580}
581
582fn apply_diff_single_option<X: PartialEq + Debug>(
583 name: &str,
584 diff: StateFieldDiff<(), X>,
585 single: &mut Option<X>,
586) -> Result<(), String> {
587 match diff.val {
588 Update(from, to) => {
589 if single.as_ref() != Some(&from) {
590 return Err(format!(
591 "{} update didn't match: {:?} vs {:?}",
592 name, single, &from
593 ));
594 }
595 *single = Some(to)
596 }
597 Insert(to) => {
598 if single.is_some() {
599 return Err(format!("{} insert found existing value", name));
600 }
601 *single = Some(to)
602 }
603 Delete(from) => {
604 if single.as_ref() != Some(&from) {
605 return Err(format!(
606 "{} delete didn't match: {:?} vs {:?}",
607 name, single, &from
608 ));
609 }
610 *single = None
611 }
612 }
613 Ok(())
614}
615
616fn apply_diffs_single<X: PartialEq + Debug>(
617 name: &str,
618 diffs: Vec<StateFieldDiff<(), X>>,
619 single: &mut X,
620) -> Result<(), String> {
621 for diff in diffs {
622 apply_diff_single(name, diff, single)?;
623 }
624 Ok(())
625}
626
627fn apply_diff_single<X: PartialEq + Debug>(
628 name: &str,
629 diff: StateFieldDiff<(), X>,
630 single: &mut X,
631) -> Result<(), String> {
632 match diff.val {
633 Update(from, to) => {
634 if single != &from {
635 return Err(format!(
636 "{} update didn't match: {:?} vs {:?}",
637 name, single, &from
638 ));
639 }
640 *single = to
641 }
642 Insert(_) => return Err(format!("cannot insert {} field", name)),
643 Delete(_) => return Err(format!("cannot delete {} field", name)),
644 }
645 Ok(())
646}
647
648fn force_apply_diffs_single<X: PartialEq + Debug>(
655 shard_id: &ShardId,
656 seqno: SeqNo,
657 name: &str,
658 diffs: Vec<StateFieldDiff<(), X>>,
659 single: &mut X,
660 metrics: &Metrics,
661) -> Result<(), String> {
662 for diff in diffs {
663 force_apply_diff_single(shard_id, seqno, name, diff, single, metrics)?;
664 }
665 Ok(())
666}
667
668fn force_apply_diff_single<X: PartialEq + Debug>(
669 shard_id: &ShardId,
670 seqno: SeqNo,
671 name: &str,
672 diff: StateFieldDiff<(), X>,
673 single: &mut X,
674 metrics: &Metrics,
675) -> Result<(), String> {
676 match diff.val {
677 Update(from, to) => {
678 if single != &from {
679 debug!(
680 "{}: update didn't match: {:?} vs {:?}, continuing to force apply diff to {:?} for shard {} and seqno {}",
681 name, single, &from, &to, shard_id, seqno
682 );
683 metrics.state.force_apply_hostname.inc();
684 }
685 *single = to
686 }
687 Insert(_) => return Err(format!("cannot insert {} field", name)),
688 Delete(_) => return Err(format!("cannot delete {} field", name)),
689 }
690 Ok(())
691}
692
693fn diff_field_sorted_iter<'a, K, V, IF, IT>(from: IF, to: IT, diffs: &mut Vec<StateFieldDiff<K, V>>)
694where
695 K: Ord + Clone + 'a,
696 V: PartialEq + Clone + 'a,
697 IF: IntoIterator<Item = (&'a K, &'a V)>,
698 IT: IntoIterator<Item = (&'a K, &'a V)>,
699{
700 let (mut from, mut to) = (from.into_iter(), to.into_iter());
701 let (mut f, mut t) = (from.next(), to.next());
702 loop {
703 match (f, t) {
704 (None, None) => break,
705 (Some((fk, fv)), Some((tk, tv))) => match fk.cmp(tk) {
706 Ordering::Less => {
707 diffs.push(StateFieldDiff {
708 key: fk.clone(),
709 val: Delete(fv.clone()),
710 });
711 let f_next = from.next();
712 debug_assert!(f_next.as_ref().map_or(true, |(fk_next, _)| fk_next > &fk));
713 f = f_next;
714 }
715 Ordering::Greater => {
716 diffs.push(StateFieldDiff {
717 key: tk.clone(),
718 val: Insert(tv.clone()),
719 });
720 let t_next = to.next();
721 debug_assert!(t_next.as_ref().map_or(true, |(tk_next, _)| tk_next > &tk));
722 t = t_next;
723 }
724 Ordering::Equal => {
725 if fv != tv {
728 diffs.push(StateFieldDiff {
729 key: fk.clone(),
730 val: Update(fv.clone(), tv.clone()),
731 });
732 }
733 let f_next = from.next();
734 debug_assert!(f_next.as_ref().map_or(true, |(fk_next, _)| fk_next > &fk));
735 f = f_next;
736 let t_next = to.next();
737 debug_assert!(t_next.as_ref().map_or(true, |(tk_next, _)| tk_next > &tk));
738 t = t_next;
739 }
740 },
741 (None, Some((tk, tv))) => {
742 diffs.push(StateFieldDiff {
743 key: tk.clone(),
744 val: Insert(tv.clone()),
745 });
746 let t_next = to.next();
747 debug_assert!(t_next.as_ref().map_or(true, |(tk_next, _)| tk_next > &tk));
748 t = t_next;
749 }
750 (Some((fk, fv)), None) => {
751 diffs.push(StateFieldDiff {
752 key: fk.clone(),
753 val: Delete(fv.clone()),
754 });
755 let f_next = from.next();
756 debug_assert!(f_next.as_ref().map_or(true, |(fk_next, _)| fk_next > &fk));
757 f = f_next;
758 }
759 }
760 }
761}
762
763fn apply_diffs_map<K: Ord, V: PartialEq + Debug>(
764 name: &str,
765 diffs: impl IntoIterator<Item = StateFieldDiff<K, V>>,
766 map: &mut BTreeMap<K, V>,
767) -> Result<(), String> {
768 for diff in diffs {
769 apply_diff_map(name, diff, map)?;
770 }
771 Ok(())
772}
773
774fn apply_diff_map<K: Ord, V: PartialEq + Debug>(
778 name: &str,
779 diff: StateFieldDiff<K, V>,
780 map: &mut BTreeMap<K, V>,
781) -> Result<(), String> {
782 match diff.val {
783 Insert(to) => {
784 let prev = map.insert(diff.key, to);
785 if prev != None {
786 return Err(format!("{} insert found existing value: {:?}", name, prev));
787 }
788 }
789 Update(from, to) => {
790 let prev = map.insert(diff.key, to);
791 if prev.as_ref() != Some(&from) {
792 return Err(format!(
793 "{} update didn't match: {:?} vs {:?}",
794 name,
795 prev,
796 Some(from),
797 ));
798 }
799 }
800 Delete(from) => {
801 let prev = map.remove(&diff.key);
802 if prev.as_ref() != Some(&from) {
803 return Err(format!(
804 "{} delete didn't match: {:?} vs {:?}",
805 name,
806 prev,
807 Some(from),
808 ));
809 }
810 }
811 };
812 Ok(())
813}
814
815fn apply_diffs_spine<T: Timestamp + Lattice + Codec64>(
819 metrics: &Metrics,
820 mut diffs: Vec<StateFieldDiff<HollowBatch<T>, ()>>,
821 trace: &mut Trace<T>,
822) -> Result<(), String> {
823 if let Some(insert) = sniff_insert(&mut diffs, trace.upper()) {
827 let () = trace.push_batch_no_merge_reqs(insert);
830 if diffs.is_empty() {
834 metrics.state.apply_spine_fast_path.inc();
835 return Ok(());
836 }
837 }
838
839 match &diffs[..] {
840 [] => return Ok(()),
842
843 [
847 StateFieldDiff {
848 key: del,
849 val: StateFieldValDiff::Delete(()),
850 },
851 StateFieldDiff {
852 key: ins,
853 val: StateFieldValDiff::Insert(()),
854 },
855 ] => {
856 if del.is_empty()
857 && ins.is_empty()
858 && del.desc.lower() == ins.desc.lower()
859 && PartialOrder::less_than(del.desc.upper(), ins.desc.upper())
860 {
861 let () = trace.push_batch_no_merge_reqs(HollowBatch::empty(Description::new(
864 del.desc.upper().clone(),
865 ins.desc.upper().clone(),
866 Antichain::from_elem(T::minimum()),
870 )));
871 metrics.state.apply_spine_fast_path.inc();
872 return Ok(());
873 }
874 }
875 _ => {}
877 }
878
879 if let Some((_inputs, output)) = sniff_compaction(&diffs) {
881 let res = FueledMergeRes {
882 output,
883 input: CompactionInput::Legacy,
884 new_active_compaction: None,
885 };
886 if trace.apply_merge_res_unchecked(&res).applied() {
897 metrics.state.apply_spine_fast_path.inc();
900 return Ok(());
901 }
902
903 let mut batches = Vec::new();
905 trace.map_batches(|b| batches.push(b.clone()));
906
907 match apply_compaction_lenient(metrics, batches, &res.output) {
908 Ok(batches) => {
909 let mut new_trace = Trace::default();
910 new_trace.roundtrip_structure = trace.roundtrip_structure;
911 new_trace.downgrade_since(trace.since());
912 for batch in batches {
913 let () = new_trace.push_batch_no_merge_reqs(batch.clone());
916 }
917 *trace = new_trace;
918 metrics.state.apply_spine_slow_path_lenient.inc();
919 return Ok(());
920 }
921 Err(err) => {
922 return Err(format!(
923 "lenient compaction result apply unexpectedly failed: {}",
924 err
925 ));
926 }
927 }
928 }
929
930 metrics.state.apply_spine_slow_path.inc();
932 debug!(
933 "apply_diffs_spine didn't hit a fast-path diffs={:?} trace={:?}",
934 diffs, trace
935 );
936
937 let batches = {
938 let mut batches = BTreeMap::new();
939 trace.map_batches(|b| assert_none!(batches.insert(b.clone(), ())));
940 apply_diffs_map("spine", diffs.clone(), &mut batches).map(|_ok| batches)
941 };
942
943 let batches = match batches {
944 Ok(batches) => batches,
945 Err(err) => {
946 metrics
947 .state
948 .apply_spine_slow_path_with_reconstruction
949 .inc();
950 debug!(
951 "apply_diffs_spines could not apply diffs directly to existing trace batches: {}. diffs={:?} trace={:?}",
952 err, diffs, trace
953 );
954 let mut reconstructed_spine = Trace::default();
960 reconstructed_spine.roundtrip_structure = trace.roundtrip_structure;
961 trace.map_batches(|b| {
962 let () = reconstructed_spine.push_batch_no_merge_reqs(b.clone());
965 });
966
967 let mut batches = BTreeMap::new();
968 reconstructed_spine.map_batches(|b| assert_none!(batches.insert(b.clone(), ())));
969 apply_diffs_map("spine", diffs, &mut batches)?;
970 batches
971 }
972 };
973
974 let mut new_trace = Trace::default();
975 new_trace.roundtrip_structure = trace.roundtrip_structure;
976 new_trace.downgrade_since(trace.since());
977 for (batch, ()) in batches {
978 let () = new_trace.push_batch_no_merge_reqs(batch);
981 }
982 *trace = new_trace;
983 Ok(())
984}
985
986fn sniff_insert<T: Timestamp + Lattice>(
987 diffs: &mut Vec<StateFieldDiff<HollowBatch<T>, ()>>,
988 upper: &Antichain<T>,
989) -> Option<HollowBatch<T>> {
990 for idx in 0..diffs.len() {
991 match &diffs[idx] {
992 StateFieldDiff {
993 key,
994 val: StateFieldValDiff::Insert(()),
995 } if key.desc.lower() == upper => return Some(diffs.remove(idx).key),
996 _ => continue,
997 }
998 }
999 None
1000}
1001
1002fn sniff_compaction<'a, T: Timestamp + Lattice>(
1005 diffs: &'a [StateFieldDiff<HollowBatch<T>, ()>],
1006) -> Option<(Vec<&'a HollowBatch<T>>, HollowBatch<T>)> {
1007 let mut inserts = diffs.iter().flat_map(|x| match x.val {
1010 StateFieldValDiff::Insert(()) => Some(&x.key),
1011 _ => None,
1012 });
1013 let compaction_output = match inserts.next() {
1014 Some(x) => x,
1015 None => return None,
1016 };
1017 if let Some(_) = inserts.next() {
1018 return None;
1019 }
1020
1021 let mut compaction_inputs = Vec::with_capacity(diffs.len() - 1);
1023 for diff in diffs.iter() {
1024 match diff.val {
1025 StateFieldValDiff::Delete(()) => {
1026 compaction_inputs.push(&diff.key);
1027 }
1028 StateFieldValDiff::Insert(()) => {}
1029 StateFieldValDiff::Update((), ()) => {
1030 return None;
1033 }
1034 }
1035 }
1036
1037 Some((compaction_inputs, compaction_output.clone()))
1038}
1039
1040fn apply_compaction_lenient<'a, T: Timestamp + Lattice>(
1062 metrics: &Metrics,
1063 mut trace: Vec<HollowBatch<T>>,
1064 replacement: &'a HollowBatch<T>,
1065) -> Result<Vec<HollowBatch<T>>, String> {
1066 let mut overlapping_batches = Vec::new();
1067 trace.retain(|b| {
1068 let before_replacement = PartialOrder::less_equal(b.desc.upper(), replacement.desc.lower());
1069 let after_replacement = PartialOrder::less_equal(replacement.desc.upper(), b.desc.lower());
1070 let overlaps_replacement = !(before_replacement || after_replacement);
1071 if overlaps_replacement {
1072 overlapping_batches.push(b.clone());
1073 false
1074 } else {
1075 true
1076 }
1077 });
1078
1079 {
1080 let first_overlapping_batch = match overlapping_batches.first() {
1081 Some(x) => x,
1082 None => return Err("replacement didn't overlap any batches".into()),
1083 };
1084 if PartialOrder::less_than(
1085 first_overlapping_batch.desc.lower(),
1086 replacement.desc.lower(),
1087 ) {
1088 if first_overlapping_batch.len > 0 {
1089 return Err(format!(
1090 "overlapping batch was unexpectedly non-empty: {:?}",
1091 first_overlapping_batch
1092 ));
1093 }
1094 let desc = Description::new(
1095 first_overlapping_batch.desc.lower().clone(),
1096 replacement.desc.lower().clone(),
1097 first_overlapping_batch.desc.since().clone(),
1098 );
1099 trace.push(HollowBatch::empty(desc));
1100 metrics.state.apply_spine_slow_path_lenient_adjustment.inc();
1101 }
1102 }
1103
1104 {
1105 let last_overlapping_batch = match overlapping_batches.last() {
1106 Some(x) => x,
1107 None => return Err("replacement didn't overlap any batches".into()),
1108 };
1109 if PartialOrder::less_than(
1110 replacement.desc.upper(),
1111 last_overlapping_batch.desc.upper(),
1112 ) {
1113 if last_overlapping_batch.len > 0 {
1114 return Err(format!(
1115 "overlapping batch was unexpectedly non-empty: {:?}",
1116 last_overlapping_batch
1117 ));
1118 }
1119 let desc = Description::new(
1120 replacement.desc.upper().clone(),
1121 last_overlapping_batch.desc.upper().clone(),
1122 last_overlapping_batch.desc.since().clone(),
1123 );
1124 trace.push(HollowBatch::empty(desc));
1125 metrics.state.apply_spine_slow_path_lenient_adjustment.inc();
1126 }
1127 }
1128 trace.push(replacement.clone());
1129
1130 trace.sort_by(|a, b| a.desc.lower().elements().cmp(b.desc.lower().elements()));
1132
1133 let mut expected_lower = &Antichain::from_elem(T::minimum());
1135 for b in trace.iter() {
1136 if b.desc.lower() != expected_lower {
1137 return Err(format!(
1138 "lower {:?} did not match expected {:?}: {:?}",
1139 b.desc.lower(),
1140 expected_lower,
1141 trace
1142 ));
1143 }
1144 expected_lower = b.desc.upper();
1145 }
1146 Ok(trace)
1147}
1148
1149#[derive(Debug)]
1165pub struct ProtoStateFieldDiffsWriter {
1166 data_buf: BytesMut,
1167 proto: ProtoStateFieldDiffs,
1168}
1169
1170impl ProtoStateFieldDiffsWriter {
1171 pub fn push_field(&mut self, field: ProtoStateField) {
1173 self.proto.fields.push(i32::from(field));
1174 }
1175
1176 pub fn push_diff_type(&mut self, diff_type: ProtoStateFieldDiffType) {
1178 self.proto.diff_types.push(i32::from(diff_type));
1179 }
1180
1181 pub fn encode_proto<M: prost::Message>(&mut self, msg: &M) {
1183 let len_before = self.data_buf.len();
1184 self.data_buf.reserve(msg.encoded_len());
1185
1186 msg.encode_raw(&mut self.data_buf);
1191
1192 let written_len = self.data_buf.len() - len_before;
1194 self.proto.data_lens.push(u64::cast_from(written_len));
1195 }
1196
1197 pub fn into_proto(self) -> ProtoStateFieldDiffs {
1198 let ProtoStateFieldDiffsWriter {
1199 data_buf,
1200 mut proto,
1201 } = self;
1202
1203 assert!(proto.data_bytes.is_empty());
1205
1206 let data_bytes = data_buf.freeze();
1208 proto.data_bytes = data_bytes;
1209
1210 proto
1211 }
1212}
1213
1214impl ProtoStateFieldDiffs {
1215 pub fn into_writer(mut self) -> ProtoStateFieldDiffsWriter {
1216 let mut data_buf = BytesMut::with_capacity(self.data_bytes.len());
1218
1219 let existing_data = std::mem::take(&mut self.data_bytes);
1221 data_buf.extend(existing_data);
1222
1223 ProtoStateFieldDiffsWriter {
1224 data_buf,
1225 proto: self,
1226 }
1227 }
1228
1229 pub fn iter<'a>(&'a self) -> ProtoStateFieldDiffsIter<'a> {
1230 let len = self.fields.len();
1231 assert_eq!(self.diff_types.len(), len);
1232
1233 ProtoStateFieldDiffsIter {
1234 len,
1235 diff_idx: 0,
1236 data_idx: 0,
1237 data_offset: 0,
1238 diffs: self,
1239 }
1240 }
1241
1242 pub fn validate(&self) -> Result<(), String> {
1243 if self.fields.len() != self.diff_types.len() {
1244 return Err(format!(
1245 "fields {} and diff_types {} lengths disagree",
1246 self.fields.len(),
1247 self.diff_types.len()
1248 ));
1249 }
1250
1251 let mut expected_data_slices = 0;
1252 for diff_type in self.diff_types.iter() {
1253 expected_data_slices += 1;
1255 match ProtoStateFieldDiffType::try_from(*diff_type) {
1257 Ok(ProtoStateFieldDiffType::Insert) => expected_data_slices += 1,
1258 Ok(ProtoStateFieldDiffType::Update) => expected_data_slices += 2,
1259 Ok(ProtoStateFieldDiffType::Delete) => expected_data_slices += 1,
1260 Err(_) => return Err(format!("unknown diff_type {}", diff_type)),
1261 }
1262 }
1263 if expected_data_slices != self.data_lens.len() {
1264 return Err(format!(
1265 "expected {} data slices got {}",
1266 expected_data_slices,
1267 self.data_lens.len()
1268 ));
1269 }
1270
1271 let expected_data_bytes = usize::cast_from(self.data_lens.iter().copied().sum::<u64>());
1272 if expected_data_bytes != self.data_bytes.len() {
1273 return Err(format!(
1274 "expected {} data bytes got {}",
1275 expected_data_bytes,
1276 self.data_bytes.len()
1277 ));
1278 }
1279
1280 Ok(())
1281 }
1282}
1283
1284#[derive(Debug)]
1285pub struct ProtoStateFieldDiff<'a> {
1286 pub key: &'a [u8],
1287 pub diff_type: ProtoStateFieldDiffType,
1288 pub from: &'a [u8],
1289 pub to: &'a [u8],
1290}
1291
1292pub struct ProtoStateFieldDiffsIter<'a> {
1293 len: usize,
1294 diff_idx: usize,
1295 data_idx: usize,
1296 data_offset: usize,
1297 diffs: &'a ProtoStateFieldDiffs,
1298}
1299
1300impl<'a> Iterator for ProtoStateFieldDiffsIter<'a> {
1301 type Item = Result<(ProtoStateField, ProtoStateFieldDiff<'a>), TryFromProtoError>;
1302
1303 fn next(&mut self) -> Option<Self::Item> {
1304 if self.diff_idx >= self.len {
1305 return None;
1306 }
1307 let mut next_data = || {
1308 let start = self.data_offset;
1309 let end = start + usize::cast_from(self.diffs.data_lens[self.data_idx]);
1310 let data = &self.diffs.data_bytes[start..end];
1311 self.data_idx += 1;
1312 self.data_offset = end;
1313 data
1314 };
1315 let field = match ProtoStateField::try_from(self.diffs.fields[self.diff_idx]) {
1316 Ok(x) => x,
1317 Err(_) => {
1318 return Some(Err(TryFromProtoError::unknown_enum_variant(format!(
1319 "ProtoStateField({})",
1320 self.diffs.fields[self.diff_idx]
1321 ))));
1322 }
1323 };
1324 let diff_type =
1325 match ProtoStateFieldDiffType::try_from(self.diffs.diff_types[self.diff_idx]) {
1326 Ok(x) => x,
1327 Err(_) => {
1328 return Some(Err(TryFromProtoError::unknown_enum_variant(format!(
1329 "ProtoStateFieldDiffType({})",
1330 self.diffs.diff_types[self.diff_idx]
1331 ))));
1332 }
1333 };
1334 let key = next_data();
1335 let (from, to): (&[u8], &[u8]) = match diff_type {
1336 ProtoStateFieldDiffType::Insert => (&[], next_data()),
1337 ProtoStateFieldDiffType::Update => (next_data(), next_data()),
1338 ProtoStateFieldDiffType::Delete => (next_data(), &[]),
1339 };
1340 let diff = ProtoStateFieldDiff {
1341 key,
1342 diff_type,
1343 from,
1344 to,
1345 };
1346 self.diff_idx += 1;
1347 Some(Ok((field, diff)))
1348 }
1349}
1350
1351#[cfg(test)]
1352mod tests {
1353 use semver::Version;
1354 use std::ops::ControlFlow::Continue;
1355
1356 use crate::internal::paths::{PartId, PartialBatchKey, RollupId, WriterKey};
1357 use mz_ore::metrics::MetricsRegistry;
1358
1359 use crate::ShardId;
1360 use crate::internal::state::TypedState;
1361
1362 use super::*;
1363
1364 #[mz_ore::test]
1367 #[cfg_attr(miri, ignore)] fn test_state_sync() {
1369 use proptest::prelude::*;
1370
1371 #[derive(Debug, Clone)]
1372 enum Action {
1373 Append { empty: bool, time_delta: u64 },
1375 Compact { req: usize },
1377 }
1378
1379 let action_gen: BoxedStrategy<Action> = {
1380 prop::strategy::Union::new([
1381 (any::<bool>(), 1u64..10u64)
1382 .prop_map(|(empty, time_delta)| Action::Append { empty, time_delta })
1383 .boxed(),
1384 (0usize..10usize)
1385 .prop_map(|req| Action::Compact { req })
1386 .boxed(),
1387 ])
1388 .boxed()
1389 };
1390
1391 fn run(actions: Vec<(Action, bool)>, metrics: &Metrics) {
1392 let version = Version::new(0, 100, 0);
1393 let writer_key = WriterKey::Version(version.to_string());
1394 let id = ShardId::new();
1395 let hostname = "computer";
1396 let typed: TypedState<String, (), u64, i64> =
1397 TypedState::new(version, id, hostname.to_string(), 0);
1398 let mut leader = typed.state;
1399
1400 let seqno = SeqNo::minimum();
1401 let mut lower = 0u64;
1402 let mut merge_reqs = vec![];
1403
1404 leader.collections.rollups.insert(
1405 seqno,
1406 HollowRollup {
1407 key: PartialRollupKey::new(seqno, &RollupId::new()),
1408 encoded_size_bytes: None,
1409 },
1410 );
1411 leader.collections.trace.roundtrip_structure = false;
1412 let mut follower = leader.clone();
1413
1414 for (action, roundtrip_structure) in actions {
1415 let mut old_leader = leader.clone();
1417 match action {
1418 Action::Append { empty, time_delta } => {
1419 let upper = lower + time_delta;
1420 let key = if empty {
1421 None
1422 } else {
1423 let id = PartId::new();
1424 Some(PartialBatchKey::new(&writer_key, &id))
1425 };
1426
1427 let keys = key.as_ref().map(|k| k.0.as_str());
1428 let reqs = leader.collections.trace.push_batch(
1429 crate::internal::state::tests::hollow(
1430 lower,
1431 upper,
1432 keys.as_slice(),
1433 if empty { 0 } else { 1 },
1434 ),
1435 );
1436 merge_reqs.extend(reqs);
1437 lower = upper;
1438 }
1439 Action::Compact { req } => {
1440 if !merge_reqs.is_empty() {
1441 let req = merge_reqs.remove(req.min(merge_reqs.len() - 1));
1442 let len = req.inputs.iter().map(|p| p.batch.len).sum();
1443 let parts = req
1444 .inputs
1445 .into_iter()
1446 .flat_map(|p| p.batch.parts.clone())
1447 .collect();
1448 let output = HollowBatch::new_run(req.desc, parts, len);
1449 leader
1450 .collections
1451 .trace
1452 .apply_merge_res_unchecked(&FueledMergeRes {
1453 output,
1454 input: CompactionInput::Legacy,
1455 new_active_compaction: None,
1456 });
1457 }
1458 }
1459 }
1460 leader.collections.trace.roundtrip_structure = roundtrip_structure;
1461 leader.seqno.0 += 1;
1462 let diff = StateDiff::from_diff(&old_leader, &leader);
1463
1464 old_leader
1467 .apply_diff(metrics, diff.clone())
1468 .expect("diff applies to the old version of the leader state");
1469 follower
1470 .apply_diff(metrics, diff.clone())
1471 .expect("diff applies to the synced version of the follower state");
1472
1473 }
1476 }
1477
1478 let config = PersistConfig::new_for_tests();
1479 let metrics_registry = MetricsRegistry::new();
1480 let metrics: Metrics = Metrics::new(&config, &metrics_registry);
1481
1482 proptest!(|(actions in prop::collection::vec((action_gen, any::<bool>()), 1..20))| {
1483 run(actions, &metrics)
1484 })
1485 }
1486
1487 #[mz_ore::test]
1491 fn regression_15493_sniff_insert() {
1492 fn hb(lower: u64, upper: u64, len: usize) -> HollowBatch<u64> {
1493 HollowBatch::new_run(
1494 Description::new(
1495 Antichain::from_elem(lower),
1496 Antichain::from_elem(upper),
1497 Antichain::from_elem(0),
1498 ),
1499 Vec::new(),
1500 len,
1501 )
1502 }
1503
1504 let batches_before = vec![hb(0, 7094664, 0), hb(7094664, 7185234, 100)];
1521
1522 let diffs = vec![
1523 StateFieldDiff {
1524 key: hb(0, 6805359, 0),
1525 val: StateFieldValDiff::Delete(()),
1526 },
1527 StateFieldDiff {
1528 key: hb(6805359, 7083793, 0),
1529 val: StateFieldValDiff::Delete(()),
1530 },
1531 StateFieldDiff {
1532 key: hb(0, 7083793, 0),
1533 val: StateFieldValDiff::Insert(()),
1534 },
1535 StateFieldDiff {
1536 key: hb(7185234, 7185859, 20),
1537 val: StateFieldValDiff::Insert(()),
1538 },
1539 ];
1540
1541 let batches_after = vec![
1548 hb(0, 7094664, 0),
1549 hb(7094664, 7185234, 100),
1550 hb(7185234, 7185859, 20),
1551 ];
1552
1553 let cfg = PersistConfig::new_for_tests();
1554 let state = TypedState::<(), (), u64, i64>::new(
1555 cfg.build_version.clone(),
1556 ShardId::new(),
1557 cfg.hostname.clone(),
1558 (cfg.now)(),
1559 );
1560 let state = state.clone_apply(&cfg, &mut |_seqno, _cfg, state| {
1561 for b in batches_before.iter() {
1562 let _merge_reqs = state.trace.push_batch(b.clone());
1563 }
1564 Continue::<(), ()>(())
1565 });
1566 let mut state = match state {
1567 Continue((_, x)) => x,
1568 _ => unreachable!(),
1569 };
1570
1571 let metrics = Metrics::new(&PersistConfig::new_for_tests(), &MetricsRegistry::new());
1572 assert_eq!(
1573 apply_diffs_spine(&metrics, diffs, &mut state.collections.trace),
1574 Ok(())
1575 );
1576
1577 let mut actual = Vec::new();
1578 state
1579 .collections
1580 .trace
1581 .map_batches(|b| actual.push(b.clone()));
1582 assert_eq!(actual, batches_after);
1583 }
1584
1585 #[mz_ore::test]
1586 #[cfg_attr(miri, ignore)] fn apply_lenient() {
1588 #[track_caller]
1589 fn testcase(
1590 replacement: (u64, u64, u64, usize),
1591 spine: &[(u64, u64, u64, usize)],
1592 expected: Result<&[(u64, u64, u64, usize)], &str>,
1593 ) {
1594 fn batch(x: &(u64, u64, u64, usize)) -> HollowBatch<u64> {
1595 let (lower, upper, since, len) = x;
1596 let desc = Description::new(
1597 Antichain::from_elem(*lower),
1598 Antichain::from_elem(*upper),
1599 Antichain::from_elem(*since),
1600 );
1601 HollowBatch::new_run(desc, Vec::new(), *len)
1602 }
1603 let replacement = batch(&replacement);
1604 let batches = spine.iter().map(batch).collect::<Vec<_>>();
1605
1606 let metrics = Metrics::new(&PersistConfig::new_for_tests(), &MetricsRegistry::new());
1607 let actual = apply_compaction_lenient(&metrics, batches, &replacement);
1608 let expected = match expected {
1609 Ok(batches) => Ok(batches.iter().map(batch).collect::<Vec<_>>()),
1610 Err(err) => Err(err.to_owned()),
1611 };
1612 assert_eq!(actual, expected);
1613 }
1614
1615 testcase(
1617 (0, 3, 0, 100),
1618 &[(0, 1, 0, 0), (1, 2, 0, 0), (2, 3, 0, 0)],
1619 Ok(&[(0, 3, 0, 100)]),
1620 );
1621
1622 testcase(
1624 (1, 2, 0, 100),
1625 &[(0, 3, 0, 0)],
1626 Ok(&[(0, 1, 0, 0), (1, 2, 0, 100), (2, 3, 0, 0)]),
1627 );
1628
1629 testcase(
1631 (2, 4, 0, 100),
1632 &[(0, 3, 0, 0), (3, 4, 0, 0)],
1633 Ok(&[(0, 2, 0, 0), (2, 4, 0, 100)]),
1634 );
1635
1636 testcase(
1638 (2, 4, 0, 100),
1639 &[(0, 3, 0, 1), (3, 4, 0, 0)],
1640 Err(
1641 "overlapping batch was unexpectedly non-empty: HollowBatch { desc: ([0], [3], [0]), parts: [], len: 1, runs: [], run_meta: [] }",
1642 ),
1643 );
1644
1645 testcase(
1647 (2, 4, 0, 100),
1648 &[(0, 1, 0, 0), (1, 3, 0, 0), (3, 4, 0, 0)],
1649 Ok(&[(0, 1, 0, 0), (1, 2, 0, 0), (2, 4, 0, 100)]),
1650 );
1651
1652 testcase(
1654 (2, 4, 0, 100),
1655 &[(0, 3, 200, 0), (3, 4, 0, 0)],
1656 Ok(&[(0, 2, 200, 0), (2, 4, 0, 100)]),
1657 );
1658
1659 testcase(
1661 (0, 2, 0, 100),
1662 &[(0, 1, 0, 0), (1, 4, 0, 0)],
1663 Ok(&[(0, 2, 0, 100), (2, 4, 0, 0)]),
1664 );
1665
1666 testcase(
1668 (0, 2, 0, 100),
1669 &[(0, 1, 0, 0), (1, 4, 0, 1)],
1670 Err(
1671 "overlapping batch was unexpectedly non-empty: HollowBatch { desc: ([1], [4], [0]), parts: [], len: 1, runs: [], run_meta: [] }",
1672 ),
1673 );
1674
1675 testcase(
1677 (0, 2, 0, 100),
1678 &[(0, 1, 0, 0), (1, 3, 0, 0), (3, 4, 0, 0)],
1679 Ok(&[(0, 2, 0, 100), (2, 3, 0, 0), (3, 4, 0, 0)]),
1680 );
1681
1682 testcase(
1684 (0, 2, 0, 100),
1685 &[(0, 1, 0, 0), (1, 4, 200, 0)],
1686 Ok(&[(0, 2, 0, 100), (2, 4, 200, 0)]),
1687 );
1688
1689 testcase(
1691 (2, 6, 0, 100),
1692 &[(0, 3, 0, 0), (3, 5, 0, 0), (5, 8, 0, 0)],
1693 Ok(&[(0, 2, 0, 0), (2, 6, 0, 100), (6, 8, 0, 0)]),
1694 );
1695
1696 testcase(
1698 (2, 3, 0, 100),
1699 &[(0, 1, 0, 0)],
1700 Err("replacement didn't overlap any batches"),
1701 );
1702
1703 testcase(
1705 (2, 3, 0, 100),
1706 &[(4, 5, 0, 0)],
1707 Err("replacement didn't overlap any batches"),
1708 );
1709 }
1710}