differential_dataflow/trace/implementations/spine_fueled.rs
1//! An append-only collection of update batches.
2//!
3//! The `Spine` is a general-purpose trace implementation based on collection and merging
4//! immutable batches of updates. It is generic with respect to the batch type, and can be
5//! instantiated for any implementor of `trace::Batch`.
6//!
7//! ## Design
8//!
9//! This spine is represented as a list of layers, where each element in the list is either
10//!
11//! 1. MergeState::Vacant empty
12//! 2. MergeState::Single a single batch
13//! 3. MergeState::Double a pair of batches
14//!
15//! Each "batch" has the option to be `None`, indicating a non-batch that nonetheless acts
16//! as a number of updates proportionate to the level at which it exists (for bookkeeping).
17//!
18//! Each of the batches at layer i contains at most 2^i elements. The sequence of batches
19//! should have the upper bound of one match the lower bound of the next. Batches may be
20//! logically empty, with matching upper and lower bounds, as a bookkeeping mechanism.
21//!
22//! Each batch at layer i is treated as if it contains exactly 2^i elements, even though it
23//! may actually contain fewer elements. This allows us to decouple the physical representation
24//! from logical amounts of effort invested in each batch. It allows us to begin compaction and
25//! to reduce the number of updates, without compromising our ability to continue to move
26//! updates along the spine. We are explicitly making the trade-off that while some batches
27//! might compact at lower levels, we want to treat them as if they contained their full set of
28//! updates for accounting reasons (to apply work to higher levels).
29//!
30//! We maintain the invariant that for any in-progress merge at level k there should be fewer
31//! than 2^k records at levels lower than k. That is, even if we were to apply an unbounded
32//! amount of effort to those records, we would not have enough records to prompt a merge into
33//! the in-progress merge. Ideally, we maintain the extended invariant that for any in-progress
34//! merge at level k, the remaining effort required (number of records minus applied effort) is
35//! less than the number of records that would need to be added to reach 2^k records in layers
36//! below.
37//!
38//! ## Mathematics
39//!
40//! When a merge is initiated, there should be a non-negative *deficit* of updates before the layers
41//! below could plausibly produce a new batch for the currently merging layer. We must determine a
42//! factor of proportionality, so that newly arrived updates provide at least that amount of "fuel"
43//! towards the merging layer, so that the merge completes before lower levels invade.
44//!
45//! ### Deficit:
46//!
47//! A new merge is initiated only in response to the completion of a prior merge, or the introduction
48//! of new records from outside. The latter case is special, and will maintain our invariant trivially,
49//! so we will focus on the former case.
50//!
51//! When a merge at level k completes, assuming we have maintained our invariant then there should be
52//! fewer than 2^k records at lower levels. The newly created merge at level k+1 will require up to
53//! 2^k+2 units of work, and should not expect a new batch until strictly more than 2^k records are
54//! added. This means that a factor of proportionality of four should be sufficient to ensure that
55//! the merge completes before a new merge is initiated.
56//!
57//! When new records get introduced, we will need to roll up any batches at lower levels, which we
58//! treat as the introduction of records. Each of these virtual records introduced should either be
59//! accounted for the fuel it should contribute, as it results in the promotion of batches closer to
60//! in-progress merges.
61//!
62//! ### Fuel sharing
63//!
64//! We like the idea of applying fuel preferentially to merges at *lower* levels, under the idea that
65//! they are easier to complete, and we benefit from fewer total merges in progress. This does delay
66//! the completion of merges at higher levels, and may not obviously be a total win. If we choose to
67//! do this, we should make sure that we correctly account for completed merges at low layers: they
68//! should still extract fuel from new updates even though they have completed, at least until they
69//! have paid back any "debt" to higher layers by continuing to provide fuel as updates arrive.
70
71
72use crate::logging::Logger;
73use crate::trace::{Batch, BatchReader, Trace, TraceReader, ExertionLogic};
74use crate::trace::cursor::CursorList;
75use crate::trace::Merger;
76
77use ::timely::dataflow::operators::generic::OperatorInfo;
78use ::timely::progress::{Antichain, frontier::AntichainRef};
79use ::timely::order::PartialOrder;
80
81/// An append-only collection of update tuples.
82///
83/// A spine maintains a small number of immutable collections of update tuples, merging the collections when
84/// two have similar sizes. In this way, it allows the addition of more tuples, which may then be merged with
85/// other immutable collections.
86pub struct Spine<B: Batch> {
87 operator: OperatorInfo,
88 logger: Option<Logger>,
89 logical_frontier: Antichain<B::Time>, // Times after which the trace must accumulate correctly.
90 physical_frontier: Antichain<B::Time>, // Times after which the trace must be able to subset its inputs.
91 merging: Vec<MergeState<B>>, // Several possibly shared collections of updates.
92 pending: Vec<B>, // Batches at times in advance of `frontier`.
93 upper: Antichain<B::Time>,
94 effort: usize,
95 activator: Option<timely::scheduling::activate::Activator>,
96 /// Parameters to `exert_logic`, containing tuples of `(index, count, length)`.
97 exert_logic_param: Vec<(usize, usize, usize)>,
98 /// Logic to indicate whether and how many records we should introduce in the absence of actual updates.
99 exert_logic: Option<ExertionLogic>,
100}
101
102impl<B> TraceReader for Spine<B>
103where
104 B: Batch+Clone+'static,
105{
106 type Key<'a> = B::Key<'a>;
107 type Val<'a> = B::Val<'a>;
108 type Time = B::Time;
109 type TimeGat<'a> = B::TimeGat<'a>;
110 type Diff = B::Diff;
111 type DiffGat<'a> = B::DiffGat<'a>;
112
113 type Batch = B;
114 type Storage = Vec<B>;
115 type Cursor = CursorList<<B as BatchReader>::Cursor>;
116
117 fn cursor_through(&mut self, upper: AntichainRef<Self::Time>) -> Option<(Self::Cursor, Self::Storage)> {
118
119 // If `upper` is the minimum frontier, we can return an empty cursor.
120 // This can happen with operators that are written to expect the ability to acquire cursors
121 // for their prior frontiers, and which start at `[T::minimum()]`, such as `Reduce`, sadly.
122 if upper.less_equal(&<Self::Time as timely::progress::Timestamp>::minimum()) {
123 let cursors = Vec::new();
124 let storage = Vec::new();
125 return Some((CursorList::new(cursors, &storage), storage));
126 }
127
128 // The supplied `upper` should have the property that for each of our
129 // batch `lower` and `upper` frontiers, the supplied upper is comparable
130 // to the frontier; it should not be incomparable, because the frontiers
131 // that we created form a total order. If it is, there is a bug.
132 //
133 // We should acquire a cursor including all batches whose upper is less
134 // or equal to the supplied upper, excluding all batches whose lower is
135 // greater or equal to the supplied upper, and if a batch straddles the
136 // supplied upper it had better be empty.
137
138 // We shouldn't grab a cursor into a closed trace, right?
139 assert!(self.logical_frontier.borrow().len() > 0);
140
141 // Check that `upper` is greater or equal to `self.physical_frontier`.
142 // Otherwise, the cut could be in `self.merging` and it is user error anyhow.
143 // assert!(upper.iter().all(|t1| self.physical_frontier.iter().any(|t2| t2.less_equal(t1))));
144 assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &upper));
145
146 let mut cursors = Vec::new();
147 let mut storage = Vec::new();
148
149 for merge_state in self.merging.iter().rev() {
150 match merge_state {
151 MergeState::Double(variant) => {
152 match variant {
153 MergeVariant::InProgress(batch1, batch2, _) => {
154 if !batch1.is_empty() {
155 cursors.push(batch1.cursor());
156 storage.push(batch1.clone());
157 }
158 if !batch2.is_empty() {
159 cursors.push(batch2.cursor());
160 storage.push(batch2.clone());
161 }
162 },
163 MergeVariant::Complete(Some((batch, _))) => {
164 if !batch.is_empty() {
165 cursors.push(batch.cursor());
166 storage.push(batch.clone());
167 }
168 }
169 MergeVariant::Complete(None) => { },
170 }
171 },
172 MergeState::Single(Some(batch)) => {
173 if !batch.is_empty() {
174 cursors.push(batch.cursor());
175 storage.push(batch.clone());
176 }
177 },
178 MergeState::Single(None) => { },
179 MergeState::Vacant => { },
180 }
181 }
182
183 for batch in self.pending.iter() {
184
185 if !batch.is_empty() {
186
187 // For a non-empty `batch`, it is a catastrophic error if `upper`
188 // requires some-but-not-all of the updates in the batch. We can
189 // determine this from `upper` and the lower and upper bounds of
190 // the batch itself.
191 //
192 // TODO: It is not clear if this is the 100% correct logic, due
193 // to the possible non-total-orderedness of the frontiers.
194
195 let include_lower = PartialOrder::less_equal(&batch.lower().borrow(), &upper);
196 let include_upper = PartialOrder::less_equal(&batch.upper().borrow(), &upper);
197
198 if include_lower != include_upper && upper != batch.lower().borrow() {
199 panic!("`cursor_through`: `upper` straddles batch");
200 }
201
202 // include pending batches
203 if include_upper {
204 cursors.push(batch.cursor());
205 storage.push(batch.clone());
206 }
207 }
208 }
209
210 Some((CursorList::new(cursors, &storage), storage))
211 }
212 #[inline]
213 fn set_logical_compaction(&mut self, frontier: AntichainRef<B::Time>) {
214 self.logical_frontier.clear();
215 self.logical_frontier.extend(frontier.iter().cloned());
216 }
217 #[inline]
218 fn get_logical_compaction(&mut self) -> AntichainRef<B::Time> { self.logical_frontier.borrow() }
219 #[inline]
220 fn set_physical_compaction(&mut self, frontier: AntichainRef<B::Time>) {
221 // We should never request to rewind the frontier.
222 debug_assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &frontier), "FAIL\tthrough frontier !<= new frontier {:?} {:?}\n", self.physical_frontier, frontier);
223 self.physical_frontier.clear();
224 self.physical_frontier.extend(frontier.iter().cloned());
225 self.consider_merges();
226 }
227 #[inline]
228 fn get_physical_compaction(&mut self) -> AntichainRef<B::Time> { self.physical_frontier.borrow() }
229
230 #[inline]
231 fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
232 for batch in self.merging.iter().rev() {
233 match batch {
234 MergeState::Double(MergeVariant::InProgress(batch1, batch2, _)) => { f(batch1); f(batch2); },
235 MergeState::Double(MergeVariant::Complete(Some((batch, _)))) => { f(batch) },
236 MergeState::Single(Some(batch)) => { f(batch) },
237 _ => { },
238 }
239 }
240 for batch in self.pending.iter() {
241 f(batch);
242 }
243 }
244}
245
246// A trace implementation for any key type that can be borrowed from or converted into `Key`.
247// TODO: Almost all this implementation seems to be generic with respect to the trace and batch types.
248impl<B> Trace for Spine<B>
249where
250 B: Batch+Clone+'static,
251{
252 fn new(
253 info: ::timely::dataflow::operators::generic::OperatorInfo,
254 logging: Option<crate::logging::Logger>,
255 activator: Option<timely::scheduling::activate::Activator>,
256 ) -> Self {
257 Self::with_effort(1, info, logging, activator)
258 }
259
260 /// Apply some amount of effort to trace maintenance.
261 ///
262 /// Whether and how much effort to apply is determined by `self.exert_logic`, a closure the user can set.
263 fn exert(&mut self) {
264 // If there is work to be done, ...
265 self.tidy_layers();
266 // Determine whether we should apply effort independent of updates.
267 if let Some(effort) = self.exert_effort() {
268
269 // If any merges exist, we can directly call `apply_fuel`.
270 if self.merging.iter().any(|b| b.is_double()) {
271 self.apply_fuel(&mut (effort as isize));
272 }
273 // Otherwise, we'll need to introduce fake updates to move merges along.
274 else {
275 // Introduce an empty batch with roughly *effort number of virtual updates.
276 let level = effort.next_power_of_two().trailing_zeros() as usize;
277 self.introduce_batch(None, level);
278 }
279 // We were not in reduced form, so let's check again in the future.
280 if let Some(activator) = &self.activator {
281 activator.activate();
282 }
283 }
284 }
285
286 fn set_exert_logic(&mut self, logic: ExertionLogic) {
287 self.exert_logic = Some(logic);
288 }
289
290 // Ideally, this method acts as insertion of `batch`, even if we are not yet able to begin
291 // merging the batch. This means it is a good time to perform amortized work proportional
292 // to the size of batch.
293 fn insert(&mut self, batch: Self::Batch) {
294
295 // Log the introduction of a batch.
296 self.logger.as_ref().map(|l| l.log(crate::logging::BatchEvent {
297 operator: self.operator.global_id,
298 length: batch.len()
299 }));
300
301 assert!(batch.lower() != batch.upper());
302 assert_eq!(batch.lower(), &self.upper);
303
304 self.upper.clone_from(batch.upper());
305
306 // TODO: Consolidate or discard empty batches.
307 self.pending.push(batch);
308 self.consider_merges();
309 }
310
311 /// Completes the trace with a final empty batch.
312 fn close(&mut self) {
313 if !self.upper.borrow().is_empty() {
314 self.insert(B::empty(self.upper.clone(), Antichain::new()));
315 }
316 }
317}
318
319// Drop implementation allows us to log batch drops, to zero out maintained totals.
320impl<B: Batch> Drop for Spine<B> {
321 fn drop(&mut self) {
322 self.drop_batches();
323 }
324}
325
326
327impl<B: Batch> Spine<B> {
328 /// Drops and logs batches. Used in `set_logical_compaction` and drop.
329 fn drop_batches(&mut self) {
330 if let Some(logger) = &self.logger {
331 for batch in self.merging.drain(..) {
332 match batch {
333 MergeState::Single(Some(batch)) => {
334 logger.log(crate::logging::DropEvent {
335 operator: self.operator.global_id,
336 length: batch.len(),
337 });
338 },
339 MergeState::Double(MergeVariant::InProgress(batch1, batch2, _)) => {
340 logger.log(crate::logging::DropEvent {
341 operator: self.operator.global_id,
342 length: batch1.len(),
343 });
344 logger.log(crate::logging::DropEvent {
345 operator: self.operator.global_id,
346 length: batch2.len(),
347 });
348 },
349 MergeState::Double(MergeVariant::Complete(Some((batch, _)))) => {
350 logger.log(crate::logging::DropEvent {
351 operator: self.operator.global_id,
352 length: batch.len(),
353 });
354 }
355 _ => { },
356 }
357 }
358 for batch in self.pending.drain(..) {
359 logger.log(crate::logging::DropEvent {
360 operator: self.operator.global_id,
361 length: batch.len(),
362 });
363 }
364 }
365 }
366}
367
368impl<B: Batch> Spine<B> {
369 /// Determine the amount of effort we should exert in the absence of updates.
370 ///
371 /// This method prepares an iterator over batches, including the level, count, and length of each layer.
372 /// It supplies this to `self.exert_logic`, who produces the response of the amount of exertion to apply.
373 fn exert_effort(&mut self) -> Option<usize> {
374 self.exert_logic.as_ref().and_then(|exert_logic| {
375 self.exert_logic_param.clear();
376 self.exert_logic_param.extend(self.merging.iter().enumerate().rev().map(|(index, batch)| {
377 match batch {
378 MergeState::Vacant => (index, 0, 0),
379 MergeState::Single(_) => (index, 1, batch.len()),
380 MergeState::Double(_) => (index, 2, batch.len()),
381 }
382 }));
383
384 (exert_logic)(&self.exert_logic_param[..])
385 })
386 }
387
388 /// Describes the merge progress of layers in the trace.
389 ///
390 /// Intended for diagnostics rather than public consumption.
391 #[allow(dead_code)]
392 fn describe(&self) -> Vec<(usize, usize)> {
393 self.merging
394 .iter()
395 .map(|b| match b {
396 MergeState::Vacant => (0, 0),
397 x @ MergeState::Single(_) => (1, x.len()),
398 x @ MergeState::Double(_) => (2, x.len()),
399 })
400 .collect()
401 }
402
403 /// Allocates a fueled `Spine` with a specified effort multiplier.
404 ///
405 /// This trace will merge batches progressively, with each inserted batch applying a multiple
406 /// of the batch's length in effort to each merge. The `effort` parameter is that multiplier.
407 /// This value should be at least one for the merging to happen; a value of zero is not helpful.
408 pub fn with_effort(
409 mut effort: usize,
410 operator: OperatorInfo,
411 logger: Option<crate::logging::Logger>,
412 activator: Option<timely::scheduling::activate::Activator>,
413 ) -> Self {
414
415 // Zero effort is .. not smart.
416 if effort == 0 { effort = 1; }
417
418 Spine {
419 operator,
420 logger,
421 logical_frontier: Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()),
422 physical_frontier: Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()),
423 merging: Vec::new(),
424 pending: Vec::new(),
425 upper: Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()),
426 effort,
427 activator,
428 exert_logic_param: Vec::default(),
429 exert_logic: None,
430 }
431 }
432
433 /// Migrate data from `self.pending` into `self.merging`.
434 ///
435 /// This method reflects on the bookmarks held by others that may prevent merging, and in the
436 /// case that new batches can be introduced to the pile of mergeable batches, it gets on that.
437 #[inline(never)]
438 fn consider_merges(&mut self) {
439
440 // TODO: Consider merging pending batches before introducing them.
441 // TODO: We could use a `VecDeque` here to draw from the front and append to the back.
442 while !self.pending.is_empty() && PartialOrder::less_equal(self.pending[0].upper(), &self.physical_frontier)
443 // self.physical_frontier.iter().all(|t1| self.pending[0].upper().iter().any(|t2| t2.less_equal(t1)))
444 {
445 // Batch can be taken in optimized insertion.
446 // Otherwise it is inserted normally at the end of the method.
447 let mut batch = Some(self.pending.remove(0));
448
449 // If `batch` and the most recently inserted batch are both empty, we can just fuse them.
450 // We can also replace a structurally empty batch with this empty batch, preserving the
451 // apparent record count but now with non-trivial lower and upper bounds.
452 if batch.as_ref().unwrap().len() == 0 {
453 if let Some(position) = self.merging.iter().position(|m| !m.is_vacant()) {
454 if self.merging[position].is_single() && self.merging[position].len() == 0 {
455 self.insert_at(batch.take(), position);
456 let merged = self.complete_at(position);
457 self.merging[position] = MergeState::Single(merged);
458 }
459 }
460 }
461
462 // Normal insertion for the batch.
463 if let Some(batch) = batch {
464 let index = batch.len().next_power_of_two();
465 self.introduce_batch(Some(batch), index.trailing_zeros() as usize);
466 }
467 }
468
469 // Having performed all of our work, if we should perform more work reschedule ourselves.
470 if self.exert_effort().is_some() {
471 if let Some(activator) = &self.activator {
472 activator.activate();
473 }
474 }
475 }
476
477 /// Introduces a batch at an indicated level.
478 ///
479 /// The level indication is often related to the size of the batch, but
480 /// it can also be used to artificially fuel the computation by supplying
481 /// empty batches at non-trivial indices, to move merges along.
482 pub fn introduce_batch(&mut self, batch: Option<B>, batch_index: usize) {
483
484 // Step 0. Determine an amount of fuel to use for the computation.
485 //
486 // Fuel is used to drive maintenance of the data structure,
487 // and in particular are used to make progress through merges
488 // that are in progress. The amount of fuel to use should be
489 // proportional to the number of records introduced, so that
490 // we are guaranteed to complete all merges before they are
491 // required as arguments to merges again.
492 //
493 // The fuel use policy is negotiable, in that we might aim
494 // to use relatively less when we can, so that we return
495 // control promptly, or we might account more work to larger
496 // batches. Not clear to me which are best, of if there
497 // should be a configuration knob controlling this.
498
499 // The amount of fuel to use is proportional to 2^batch_index, scaled
500 // by a factor of self.effort which determines how eager we are in
501 // performing maintenance work. We need to ensure that each merge in
502 // progress receives fuel for each introduced batch, and so multiply
503 // by that as well.
504 if batch_index > 32 { println!("Large batch index: {}", batch_index); }
505
506 // We believe that eight units of fuel is sufficient for each introduced
507 // record, accounted as four for each record, and a potential four more
508 // for each virtual record associated with promoting existing smaller
509 // batches. We could try and make this be less, or be scaled to merges
510 // based on their deficit at time of instantiation. For now, we remain
511 // conservative.
512 let mut fuel = 8 << batch_index;
513 // Scale up by the effort parameter, which is calibrated to one as the
514 // minimum amount of effort.
515 fuel *= self.effort;
516 // Convert to an `isize` so we can observe any fuel shortfall.
517 let mut fuel = fuel as isize;
518
519 // Step 1. Apply fuel to each in-progress merge.
520 //
521 // Before we can introduce new updates, we must apply any
522 // fuel to in-progress merges, as this fuel is what ensures
523 // that the merges will be complete by the time we insert
524 // the updates.
525 self.apply_fuel(&mut fuel);
526
527 // Step 2. We must ensure the invariant that adjacent layers do not
528 // contain two batches will be satisfied when we insert the
529 // batch. We forcibly completing all merges at layers lower
530 // than and including `batch_index`, so that the new batch
531 // is inserted into an empty layer.
532 //
533 // We could relax this to "strictly less than `batch_index`"
534 // if the layer above has only a single batch in it, which
535 // seems not implausible if it has been the focus of effort.
536 //
537 // This should be interpreted as the introduction of some
538 // volume of fake updates, and we will need to fuel merges
539 // by a proportional amount to ensure that they are not
540 // surprised later on. The number of fake updates should
541 // correspond to the deficit for the layer, which perhaps
542 // we should track explicitly.
543 self.roll_up(batch_index);
544
545 // Step 3. This insertion should be into an empty layer. It is a
546 // logical error otherwise, as we may be violating our
547 // invariant, from which all wonderment derives.
548 self.insert_at(batch, batch_index);
549
550 // Step 4. Tidy the largest layers.
551 //
552 // It is important that we not tidy only smaller layers,
553 // as their ascension is what ensures the merging and
554 // eventual compaction of the largest layers.
555 self.tidy_layers();
556 }
557
558 /// Ensures that an insertion at layer `index` will succeed.
559 ///
560 /// This method is subject to the constraint that all existing batches
561 /// should occur at higher levels, which requires it to "roll up" batches
562 /// present at lower levels before the method is called. In doing this,
563 /// we should not introduce more virtual records than 2^index, as that
564 /// is the amount of excess fuel we have budgeted for completing merges.
565 fn roll_up(&mut self, index: usize) {
566
567 // Ensure entries sufficient for `index`.
568 while self.merging.len() <= index {
569 self.merging.push(MergeState::Vacant);
570 }
571
572 // We only need to roll up if there are non-vacant layers.
573 if self.merging[.. index].iter().any(|m| !m.is_vacant()) {
574
575 // Collect and merge all batches at layers up to but not including `index`.
576 let mut merged = None;
577 for i in 0 .. index {
578 self.insert_at(merged, i);
579 merged = self.complete_at(i);
580 }
581
582 // The merged results should be introduced at level `index`, which should
583 // be ready to absorb them (possibly creating a new merge at the time).
584 self.insert_at(merged, index);
585
586 // If the insertion results in a merge, we should complete it to ensure
587 // the upcoming insertion at `index` does not panic.
588 if self.merging[index].is_double() {
589 let merged = self.complete_at(index);
590 self.insert_at(merged, index + 1);
591 }
592 }
593 }
594
595 /// Applies an amount of fuel to merges in progress.
596 ///
597 /// The supplied `fuel` is for each in progress merge, and if we want to spend
598 /// the fuel non-uniformly (e.g. prioritizing merges at low layers) we could do
599 /// so in order to maintain fewer batches on average (at the risk of completing
600 /// merges of large batches later, but tbh probably not much later).
601 pub fn apply_fuel(&mut self, fuel: &mut isize) {
602 // For the moment our strategy is to apply fuel independently to each merge
603 // in progress, rather than prioritizing small merges. This sounds like a
604 // great idea, but we need better accounting in place to ensure that merges
605 // that borrow against later layers but then complete still "acquire" fuel
606 // to pay back their debts.
607 for index in 0 .. self.merging.len() {
608 // Give each level independent fuel, for now.
609 let mut fuel = *fuel;
610 // Pass along various logging stuffs, in case we need to report success.
611 self.merging[index].work(&mut fuel);
612 // `fuel` could have a deficit at this point, meaning we over-spent when
613 // we took a merge step. We could ignore this, or maintain the deficit
614 // and account future fuel against it before spending again. It isn't
615 // clear why that would be especially helpful to do; we might want to
616 // avoid overspends at multiple layers in the same invocation (to limit
617 // latencies), but there is probably a rich policy space here.
618
619 // If a merge completes, we can immediately merge it in to the next
620 // level, which is "guaranteed" to be complete at this point, by our
621 // fueling discipline.
622 if self.merging[index].is_complete() {
623 let complete = self.complete_at(index);
624 self.insert_at(complete, index+1);
625 }
626 }
627 }
628
629 /// Inserts a batch at a specific location.
630 ///
631 /// This is a non-public internal method that can panic if we try and insert into a
632 /// layer which already contains two batches (and is still in the process of merging).
633 fn insert_at(&mut self, batch: Option<B>, index: usize) {
634 // Ensure the spine is large enough.
635 while self.merging.len() <= index {
636 self.merging.push(MergeState::Vacant);
637 }
638
639 // Insert the batch at the location.
640 match self.merging[index].take() {
641 MergeState::Vacant => {
642 self.merging[index] = MergeState::Single(batch);
643 }
644 MergeState::Single(old) => {
645 // Log the initiation of a merge.
646 self.logger.as_ref().map(|l| l.log(
647 crate::logging::MergeEvent {
648 operator: self.operator.global_id,
649 scale: index,
650 length1: old.as_ref().map(|b| b.len()).unwrap_or(0),
651 length2: batch.as_ref().map(|b| b.len()).unwrap_or(0),
652 complete: None,
653 }
654 ));
655 let compaction_frontier = self.logical_frontier.borrow();
656 self.merging[index] = MergeState::begin_merge(old, batch, compaction_frontier);
657 }
658 MergeState::Double(_) => {
659 panic!("Attempted to insert batch into incomplete merge!")
660 }
661 };
662 }
663
664 /// Completes and extracts what ever is at layer `index`.
665 fn complete_at(&mut self, index: usize) -> Option<B> {
666 if let Some((merged, inputs)) = self.merging[index].complete() {
667 if let Some((input1, input2)) = inputs {
668 // Log the completion of a merge from existing parts.
669 self.logger.as_ref().map(|l| l.log(
670 crate::logging::MergeEvent {
671 operator: self.operator.global_id,
672 scale: index,
673 length1: input1.len(),
674 length2: input2.len(),
675 complete: Some(merged.len()),
676 }
677 ));
678 }
679 Some(merged)
680 }
681 else {
682 None
683 }
684 }
685
686 /// Attempts to draw down large layers to size appropriate layers.
687 fn tidy_layers(&mut self) {
688
689 // If the largest layer is complete (not merging), we can attempt
690 // to draw it down to the next layer. This is permitted if we can
691 // maintain our invariant that below each merge there are at most
692 // half the records that would be required to invade the merge.
693 if !self.merging.is_empty() {
694 let mut length = self.merging.len();
695 if self.merging[length-1].is_single() {
696
697 // To move a batch down, we require that it contain few
698 // enough records that the lower level is appropriate,
699 // and that moving the batch would not create a merge
700 // violating our invariant.
701
702 let appropriate_level = self.merging[length-1].len().next_power_of_two().trailing_zeros() as usize;
703
704 // Continue only as far as is appropriate
705 while appropriate_level < length-1 {
706
707 match self.merging[length-2].take() {
708 // Vacant or structurally empty batches can be absorbed.
709 MergeState::Vacant | MergeState::Single(None) => {
710 self.merging.remove(length-2);
711 length = self.merging.len();
712 }
713 // Single batches may initiate a merge, if sizes are
714 // within bounds, but terminate the loop either way.
715 MergeState::Single(Some(batch)) => {
716
717 // Determine the number of records that might lead
718 // to a merge. Importantly, this is not the number
719 // of actual records, but the sum of upper bounds
720 // based on indices.
721 let mut smaller = 0;
722 for (index, batch) in self.merging[..(length-2)].iter().enumerate() {
723 match batch {
724 MergeState::Vacant => { },
725 MergeState::Single(_) => { smaller += 1 << index; },
726 MergeState::Double(_) => { smaller += 2 << index; },
727 }
728 }
729
730 if smaller <= (1 << length) / 8 {
731 self.merging.remove(length-2);
732 self.insert_at(Some(batch), length-2);
733 }
734 else {
735 self.merging[length-2] = MergeState::Single(Some(batch));
736 }
737 return;
738 }
739 // If a merge is in progress there is nothing to do.
740 MergeState::Double(state) => {
741 self.merging[length-2] = MergeState::Double(state);
742 return;
743 }
744 }
745 }
746 }
747 }
748 }
749}
750
751
752/// Describes the state of a layer.
753///
754/// A layer can be empty, contain a single batch, or contain a pair of batches
755/// that are in the process of merging into a batch for the next layer.
756enum MergeState<B: Batch> {
757 /// An empty layer, containing no updates.
758 Vacant,
759 /// A layer containing a single batch.
760 ///
761 /// The `None` variant is used to represent a structurally empty batch present
762 /// to ensure the progress of maintenance work.
763 Single(Option<B>),
764 /// A layer containing two batches, in the process of merging.
765 Double(MergeVariant<B>),
766}
767
768impl<B: Batch> MergeState<B> where B::Time: Eq {
769
770 /// The number of actual updates contained in the level.
771 fn len(&self) -> usize {
772 match self {
773 MergeState::Single(Some(b)) => b.len(),
774 MergeState::Double(MergeVariant::InProgress(b1,b2,_)) => b1.len() + b2.len(),
775 MergeState::Double(MergeVariant::Complete(Some((b, _)))) => b.len(),
776 _ => 0,
777 }
778 }
779
780 /// True only for the MergeState::Vacant variant.
781 fn is_vacant(&self) -> bool {
782 if let MergeState::Vacant = self { true } else { false }
783 }
784
785 /// True only for the MergeState::Single variant.
786 fn is_single(&self) -> bool {
787 if let MergeState::Single(_) = self { true } else { false }
788 }
789
790 /// True only for the MergeState::Double variant.
791 fn is_double(&self) -> bool {
792 if let MergeState::Double(_) = self { true } else { false }
793 }
794
795 /// Immediately complete any merge.
796 ///
797 /// The result is either a batch, if there is a non-trivial batch to return
798 /// or `None` if there is no meaningful batch to return. This does not distinguish
799 /// between Vacant entries and structurally empty batches, which should be done
800 /// with the `is_complete()` method.
801 ///
802 /// There is the additional option of input batches.
803 fn complete(&mut self) -> Option<(B, Option<(B, B)>)> {
804 match std::mem::replace(self, MergeState::Vacant) {
805 MergeState::Vacant => None,
806 MergeState::Single(batch) => batch.map(|b| (b, None)),
807 MergeState::Double(variant) => variant.complete(),
808 }
809 }
810
811 /// True iff the layer is a complete merge, ready for extraction.
812 fn is_complete(&mut self) -> bool {
813 if let MergeState::Double(MergeVariant::Complete(_)) = self {
814 true
815 }
816 else {
817 false
818 }
819 }
820
821 /// Performs a bounded amount of work towards a merge.
822 ///
823 /// If the merge completes, the resulting batch is returned.
824 /// If a batch is returned, it is the obligation of the caller
825 /// to correctly install the result.
826 fn work(&mut self, fuel: &mut isize) {
827 // We only perform work for merges in progress.
828 if let MergeState::Double(layer) = self {
829 layer.work(fuel)
830 }
831 }
832
833 /// Extract the merge state, typically temporarily.
834 fn take(&mut self) -> Self {
835 std::mem::replace(self, MergeState::Vacant)
836 }
837
838 /// Initiates the merge of an "old" batch with a "new" batch.
839 ///
840 /// The upper frontier of the old batch should match the lower
841 /// frontier of the new batch, with the resulting batch describing
842 /// their composed interval, from the lower frontier of the old
843 /// batch to the upper frontier of the new batch.
844 ///
845 /// Either batch may be `None` which corresponds to a structurally
846 /// empty batch whose upper and lower froniers are equal. This
847 /// option exists purely for bookkeeping purposes, and no computation
848 /// is performed to merge the two batches.
849 fn begin_merge(batch1: Option<B>, batch2: Option<B>, compaction_frontier: AntichainRef<B::Time>) -> MergeState<B> {
850 let variant =
851 match (batch1, batch2) {
852 (Some(batch1), Some(batch2)) => {
853 assert!(batch1.upper() == batch2.lower());
854 let begin_merge = <B as Batch>::begin_merge(&batch1, &batch2, compaction_frontier);
855 MergeVariant::InProgress(batch1, batch2, begin_merge)
856 }
857 (None, Some(x)) => MergeVariant::Complete(Some((x, None))),
858 (Some(x), None) => MergeVariant::Complete(Some((x, None))),
859 (None, None) => MergeVariant::Complete(None),
860 };
861
862 MergeState::Double(variant)
863 }
864}
865
866enum MergeVariant<B: Batch> {
867 /// Describes an actual in-progress merge between two non-trivial batches.
868 InProgress(B, B, <B as Batch>::Merger),
869 /// A merge that requires no further work. May or may not represent a non-trivial batch.
870 Complete(Option<(B, Option<(B, B)>)>),
871}
872
873impl<B: Batch> MergeVariant<B> {
874
875 /// Completes and extracts the batch, unless structurally empty.
876 ///
877 /// The result is either `None`, for structurally empty batches,
878 /// or a batch and optionally input batches from which it derived.
879 fn complete(mut self) -> Option<(B, Option<(B, B)>)> {
880 let mut fuel = isize::max_value();
881 self.work(&mut fuel);
882 if let MergeVariant::Complete(batch) = self { batch }
883 else { panic!("Failed to complete a merge!"); }
884 }
885
886 /// Applies some amount of work, potentially completing the merge.
887 ///
888 /// In case the work completes, the source batches are returned.
889 /// This allows the caller to manage the released resources.
890 fn work(&mut self, fuel: &mut isize) {
891 let variant = std::mem::replace(self, MergeVariant::Complete(None));
892 if let MergeVariant::InProgress(b1,b2,mut merge) = variant {
893 merge.work(&b1,&b2,fuel);
894 if *fuel > 0 {
895 *self = MergeVariant::Complete(Some((merge.done(), Some((b1,b2)))));
896 }
897 else {
898 *self = MergeVariant::InProgress(b1,b2,merge);
899 }
900 }
901 else {
902 *self = variant;
903 }
904 }
905}