1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478
//! Traits and datastructures representing a collection trace.
//! A collection trace is a set of updates of the form `(key, val, time, diff)`, which determine the contents
//! of a collection at given times by accumulating updates whose time field is less or equal to the target field.
//! The `Trace` trait describes those types and methods that a data structure must implement to be viewed as a
//! collection trace. This trait allows operator implementations to be generic with respect to the type of trace,
//! and allows various data structures to be interpretable as multiple different types of trace.
pub mod cursor;
pub mod description;
pub mod implementations;
pub mod wrappers;
use timely::progress::{Antichain, frontier::AntichainRef};
use timely::progress::Timestamp;
use crate::logging::Logger;
use crate::difference::Semigroup;
use crate::IntoOwned;
use crate::lattice::Lattice;
pub use self::cursor::Cursor;
pub use self::description::Description;
/// A type used to express how much effort a trace should exert even in the absence of updates.
pub type ExertionLogic = std::sync::Arc<dyn for<'a> Fn(&'a [(usize, usize, usize)])->Option<usize>+Send+Sync>;
// The traces and batch and cursors want the flexibility to appear as if they manage certain types of keys and
// values and such, while perhaps using other representations, I'm thinking mostly of wrappers around the keys
// and vals that change the `Ord` implementation, or stash hash codes, or the like.
// This complicates what requirements we make so that the trace is still usable by someone who knows only about
// the base key and value types. For example, the complex types should likely dereference to the simpler types,
// so that the user can make sense of the result as if they were given references to the simpler types. At the
// same time, the collection should be formable from base types (perhaps we need an `Into` or `From` constraint)
// and we should, somehow, be able to take a reference to the simple types to compare against the more complex
// types. This second one is also like an `Into` or `From` constraint, except that we start with a reference and
// really don't need anything more complex than a reference, but we can't form an owned copy of the complex type
// without cloning it.
// We could just start by cloning things. Worry about wrapping references later on.
/// A trace whose contents may be read.
/// This is a restricted interface to the more general `Trace` trait, which extends this trait with further methods
/// to update the contents of the trace. These methods are used to examine the contents, and to update the reader's
/// capabilities (which may release restrictions on the mutations to the underlying trace and cause work to happen).
pub trait TraceReader {
/// Key by which updates are indexed.
type Key<'a>: Copy + Clone + Ord;
/// Values associated with keys.
type Val<'a>: Copy + Clone;
/// Timestamps associated with updates
type Time: Timestamp + Lattice + Ord + Clone;
/// Borrowed form of timestamp.
type TimeGat<'a>: Copy + IntoOwned<'a, Owned = Self::Time>;
/// Owned form of update difference.
type Diff: Semigroup + 'static;
/// Borrowed form of update difference.
type DiffGat<'a> : Copy + IntoOwned<'a, Owned = Self::Diff>;
/// The type of an immutable collection of updates.
type Batch: for<'a> BatchReader<Key<'a> = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, DiffGat<'a> = Self::DiffGat<'a>>+Clone+'static;
/// Storage type for `Self::Cursor`. Likely related to `Self::Batch`.
type Storage;
/// The type used to enumerate the collections contents.
type Cursor: for<'a> Cursor<Storage=Self::Storage, Key<'a> = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, DiffGat<'a> = Self::DiffGat<'a>>;
/// Provides a cursor over updates contained in the trace.
fn cursor(&mut self) -> (Self::Cursor, Self::Storage) {
if let Some(cursor) = self.cursor_through(Antichain::new().borrow()) {
else {
panic!("unable to acquire complete cursor for trace; is it closed?");
/// Acquires a cursor to the restriction of the collection's contents to updates at times not greater or
/// equal to an element of `upper`.
/// This method is expected to work if called with an `upper` that (i) was an observed bound in batches from
/// the trace, and (ii) the trace has not been advanced beyond `upper`. Practically, the implementation should
/// be expected to look for a "clean cut" using `upper`, and if it finds such a cut can return a cursor. This
/// should allow `upper` such as `&[]` as used by `self.cursor()`, though it is difficult to imagine other uses.
fn cursor_through(&mut self, upper: AntichainRef<Self::Time>) -> Option<(Self::Cursor, Self::Storage)>;
/// Advances the frontier that constrains logical compaction.
/// Logical compaction is the ability of the trace to change the times of the updates it contains.
/// Update times may be changed as long as their comparison to all query times beyond the logical compaction
/// frontier remains unchanged. Practically, this means that groups of timestamps not beyond the frontier can
/// be coalesced into fewer representative times.
/// Logical compaction is important, as it allows the trace to forget historical distinctions between update
/// times, and maintain a compact memory footprint over an unbounded update history.
/// By advancing the logical compaction frontier, the caller unblocks merging of otherwise equivalent updates,
/// but loses the ability to observe historical detail that is not beyond `frontier`.
/// It is an error to call this method with a frontier not equal to or beyond the most recent arguments to
/// this method, or the initial value of `get_logical_compaction()` if this method has not yet been called.
fn set_logical_compaction(&mut self, frontier: AntichainRef<Self::Time>);
/// Deprecated form of `set_logical_compaction`.
#[deprecated(since = "0.11", note = "please use `set_logical_compaction`")]
fn advance_by(&mut self, frontier: AntichainRef<Self::Time>) {
/// Reports the logical compaction frontier.
/// All update times beyond this frontier will be presented with their original times, and all update times
/// not beyond this frontier will present as a time that compares identically with all query times beyond
/// this frontier. Practically, update times not beyond this frontier should not be taken to be accurate as
/// presented, and should be used carefully, only in accumulation to times that are beyond the frontier.
fn get_logical_compaction(&mut self) -> AntichainRef<Self::Time>;
/// Deprecated form of `get_logical_compaction`.
#[deprecated(since = "0.11", note = "please use `get_logical_compaction`")]
fn advance_frontier(&mut self) -> AntichainRef<Self::Time> {
/// Advances the frontier that constrains physical compaction.
/// Physical compaction is the ability of the trace to merge the batches of updates it maintains. Physical
/// compaction does not change the updates or their timestamps, although it is also the moment at which
/// logical compaction is most likely to happen.
/// Physical compaction allows the trace to maintain a logarithmic number of batches of updates, which is
/// what allows the trace to provide efficient random access by keys and values.
/// By advancing the physical compaction frontier, the caller unblocks the merging of batches of updates,
/// but loses the ability to create a cursor through any frontier not beyond `frontier`.
/// It is an error to call this method with a frontier not equal to or beyond the most recent arguments to
/// this method, or the initial value of `get_physical_compaction()` if this method has not yet been called.
fn set_physical_compaction(&mut self, frontier: AntichainRef<Self::Time>);
/// Deprecated form of `set_physical_compaction`.
#[deprecated(since = "0.11", note = "please use `set_physical_compaction`")]
fn distinguish_since(&mut self, frontier: AntichainRef<Self::Time>) {
/// Reports the physical compaction frontier.
/// All batches containing updates beyond this frontier will not be merged with other batches. This allows
/// the caller to create a cursor through any frontier beyond the physical compaction frontier, with the
/// `cursor_through()` method. This functionality is primarily of interest to the `join` operator, and any
/// other operators who need to take notice of the physical structure of update batches.
fn get_physical_compaction(&mut self) -> AntichainRef<Self::Time>;
/// Deprecated form of `get_physical_compaction`.
#[deprecated(since = "0.11", note = "please use `get_physical_compaction`")]
fn distinguish_frontier(&mut self) -> AntichainRef<Self::Time> {
/// Maps logic across the non-empty sequence of batches in the trace.
/// This is currently used only to extract historical data to prime late-starting operators who want to reproduce
/// the stream of batches moving past the trace. It could also be a fine basis for a default implementation of the
/// cursor methods, as they (by default) just move through batches accumulating cursors into a cursor list.
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F);
/// Reads the upper frontier of committed times.
fn read_upper(&mut self, target: &mut Antichain<Self::Time>) {
target.insert(<Self::Time as timely::progress::Timestamp>::minimum());
self.map_batches(|batch| {
/// Advances `upper` by any empty batches.
/// An empty batch whose `batch.lower` bound equals the current
/// contents of `upper` will advance `upper` to `batch.upper`.
/// Taken across all batches, this should advance `upper` across
/// empty batch regions.
fn advance_upper(&mut self, upper: &mut Antichain<Self::Time>) {
self.map_batches(|batch| {
if batch.is_empty() && batch.lower() == upper {
/// An append-only collection of `(key, val, time, diff)` tuples.
/// The trace must pretend to look like a collection of `(Key, Val, Time, isize)` tuples, but is permitted
/// to introduce new types `KeyRef`, `ValRef`, and `TimeRef` which can be dereference to the types above.
/// The trace must be constructable from, and navigable by the `Key`, `Val`, `Time` types, but does not need
/// to return them.
pub trait Trace : TraceReader
where <Self as TraceReader>::Batch: Batch {
/// Allocates a new empty trace.
fn new(
info: ::timely::dataflow::operators::generic::OperatorInfo,
logging: Option<crate::logging::Logger>,
activator: Option<timely::scheduling::activate::Activator>,
) -> Self;
/// Exert merge effort, even without updates.
fn exert(&mut self);
/// Sets the logic for exertion in the absence of updates.
/// The function receives an iterator over batch levels, from large to small, as triples `(level, count, length)`,
/// indicating the level, the number of batches, and their total length in updates. It should return a number of
/// updates to perform, or `None` if no work is required.
fn set_exert_logic(&mut self, logic: ExertionLogic);
/// Introduces a batch of updates to the trace.
/// Batches describe the time intervals they contain, and they should be added to the trace in contiguous
/// intervals. If a batch arrives with a lower bound that does not equal the upper bound of the most recent
/// addition, the trace will add an empty batch. It is an error to then try to populate that region of time.
/// This restriction could be relaxed, especially if we discover ways in which batch interval order could
/// commute. For now, the trace should complain, to the extent that it cares about contiguous intervals.
fn insert(&mut self, batch: Self::Batch);
/// Introduces an empty batch concluding the trace.
/// This method should be logically equivalent to introducing an empty batch whose lower frontier equals
/// the upper frontier of the most recently introduced batch, and whose upper frontier is empty.
fn close(&mut self);
/// A batch of updates whose contents may be read.
/// This is a restricted interface to batches of updates, which support the reading of the batch's contents,
/// but do not expose ways to construct the batches. This trait is appropriate for views of the batch, and is
/// especially useful for views derived from other sources in ways that prevent the construction of batches
/// from the type of data in the view (for example, filtered views, or views with extended time coordinates).
pub trait BatchReader
Self: ::std::marker::Sized,
/// Key by which updates are indexed.
type Key<'a>: Copy + Clone + Ord;
/// Values associated with keys.
type Val<'a>: Copy + Clone;
/// Timestamps associated with updates
type Time: Timestamp + Lattice + Ord + Clone;
/// Borrowed form of timestamp.
type TimeGat<'a>: Copy + IntoOwned<'a, Owned = Self::Time>;
/// Owned form of update difference.
type Diff: Semigroup + 'static;
/// Borrowed form of update difference.
type DiffGat<'a> : Copy + IntoOwned<'a, Owned = Self::Diff>;
/// The type used to enumerate the batch's contents.
type Cursor: for<'a> Cursor<Storage=Self, Key<'a> = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, DiffGat<'a> = Self::DiffGat<'a>>;
/// Acquires a cursor to the batch's contents.
fn cursor(&self) -> Self::Cursor;
/// The number of updates in the batch.
fn len(&self) -> usize;
/// True if the batch is empty.
fn is_empty(&self) -> bool { self.len() == 0 }
/// Describes the times of the updates in the batch.
fn description(&self) -> &Description<Self::Time>;
/// All times in the batch are greater or equal to an element of `lower`.
fn lower(&self) -> &Antichain<Self::Time> { self.description().lower() }
/// All times in the batch are not greater or equal to any element of `upper`.
fn upper(&self) -> &Antichain<Self::Time> { self.description().upper() }
/// An immutable collection of updates.
pub trait Batch : BatchReader where Self: ::std::marker::Sized {
/// A type used to progressively merge batches.
type Merger: Merger<Self>;
/// Initiates the merging of consecutive batches.
/// The result of this method can be exercised to eventually produce the same result
/// that a call to `self.merge(other)` would produce, but it can be done in a measured
/// fashion. This can help to avoid latency spikes where a large merge needs to happen.
fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<Self::Time>) -> Self::Merger {
Self::Merger::new(self, other, compaction_frontier)
/// Produce an empty batch over the indicated interval.
fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self;
/// Functionality for collecting and batching updates.
pub trait Batcher {
/// Type pushed into the batcher.
type Input;
/// Type produced by the batcher.
type Output;
/// Times at which batches are formed.
type Time: Timestamp;
/// Allocates a new empty batcher.
fn new(logger: Option<Logger>, operator_id: usize) -> Self;
/// Adds an unordered container of elements to the batcher.
fn push_container(&mut self, batch: &mut Self::Input);
/// Returns all updates not greater or equal to an element of `upper`.
fn seal<B: Builder<Input=Self::Output, Time=Self::Time>>(&mut self, upper: Antichain<Self::Time>) -> B::Output;
/// Returns the lower envelope of contained update times.
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<Self::Time>;
/// Functionality for building batches from ordered update sequences.
pub trait Builder: Sized {
/// Input item type.
type Input;
/// Timestamp type.
type Time: Timestamp;
/// Output batch type.
type Output;
/// Allocates an empty builder.
/// Ideally we deprecate this and insist all non-trivial building happens via `with_capacity()`.
// #[deprecated]
fn new() -> Self { Self::with_capacity(0, 0, 0) }
/// Allocates an empty builder with capacity for the specified keys, values, and updates.
/// They represent respectively the number of distinct `key`, `(key, val)`, and total updates.
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self;
/// Adds a chunk of elements to the batch.
/// Adds all elements from `chunk` to the builder and leaves `chunk` in an undefined state.
fn push(&mut self, chunk: &mut Self::Input);
/// Completes building and returns the batch.
fn done(self, description: Description<Self::Time>) -> Self::Output;
/// Builds a batch from a chain of updates corresponding to the indicated lower and upper bounds.
/// This method relies on the chain only containing updates greater or equal to the lower frontier,
/// and not greater or equal to the upper frontier, as encoded in the description. Chains must also
/// be sorted and consolidated.
fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output;
/// Represents a merge in progress.
pub trait Merger<Output: Batch> {
/// Creates a new merger to merge the supplied batches, optionally compacting
/// up to the supplied frontier.
fn new(source1: &Output, source2: &Output, compaction_frontier: AntichainRef<Output::Time>) -> Self;
/// Perform some amount of work, decrementing `fuel`.
/// If `fuel` is non-zero after the call, the merging is complete and
/// one should call `done` to extract the merged results.
fn work(&mut self, source1: &Output, source2: &Output, fuel: &mut isize);
/// Extracts merged results.
/// This method should only be called after `work` has been called and
/// has not brought `fuel` to zero. Otherwise, the merge is still in
/// progress.
fn done(self) -> Output;
/// Blanket implementations for reference counted batches.
pub mod rc_blanket_impls {
use std::rc::Rc;
use timely::progress::{Antichain, frontier::AntichainRef};
use super::{Batch, BatchReader, Builder, Merger, Cursor, Description};
impl<B: BatchReader> BatchReader for Rc<B> {
type Key<'a> = B::Key<'a>;
type Val<'a> = B::Val<'a>;
type Time = B::Time;
type TimeGat<'a> = B::TimeGat<'a>;
type Diff = B::Diff;
type DiffGat<'a> = B::DiffGat<'a>;
/// The type used to enumerate the batch's contents.
type Cursor = RcBatchCursor<B::Cursor>;
/// Acquires a cursor to the batch's contents.
fn cursor(&self) -> Self::Cursor {
/// The number of updates in the batch.
fn len(&self) -> usize { (**self).len() }
/// Describes the times of the updates in the batch.
fn description(&self) -> &Description<Self::Time> { (**self).description() }
/// Wrapper to provide cursor to nested scope.
pub struct RcBatchCursor<C> {
cursor: C,
impl<C> RcBatchCursor<C> {
fn new(cursor: C) -> Self {
RcBatchCursor {
impl<C: Cursor> Cursor for RcBatchCursor<C> {
type Key<'a> = C::Key<'a>;
type Val<'a> = C::Val<'a>;
type Time = C::Time;
type TimeGat<'a> = C::TimeGat<'a>;
type Diff = C::Diff;
type DiffGat<'a> = C::DiffGat<'a>;
type Storage = Rc<C::Storage>;
#[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
#[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }
#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) }
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) }
fn map_times<L: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) {
self.cursor.map_times(storage, logic)
#[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
#[inline] fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) { self.cursor.seek_key(storage, key) }
#[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
#[inline] fn seek_val(&mut self, storage: &Self::Storage, val: Self::Val<'_>) { self.cursor.seek_val(storage, val) }
#[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
#[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
/// An immutable collection of updates.
impl<B: Batch> Batch for Rc<B> {
type Merger = RcMerger<B>;
fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
Rc::new(B::empty(lower, upper))
/// Wrapper type for building reference counted batches.
pub struct RcBuilder<B: Builder> { builder: B }
/// Functionality for building batches from ordered update sequences.
impl<B: Builder> Builder for RcBuilder<B> {
type Input = B::Input;
type Time = B::Time;
type Output = Rc<B::Output>;
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { RcBuilder { builder: B::with_capacity(keys, vals, upds) } }
fn push(&mut self, input: &mut Self::Input) { self.builder.push(input) }
fn done(self, description: Description<Self::Time>) -> Rc<B::Output> { Rc::new(self.builder.done(description)) }
fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
Rc::new(B::seal(chain, description))
/// Wrapper type for merging reference counted batches.
pub struct RcMerger<B:Batch> { merger: B::Merger }
/// Represents a merge in progress.
impl<B:Batch> Merger<Rc<B>> for RcMerger<B> {
fn new(source1: &Rc<B>, source2: &Rc<B>, compaction_frontier: AntichainRef<B::Time>) -> Self { RcMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } }
fn work(&mut self, source1: &Rc<B>, source2: &Rc<B>, fuel: &mut isize) { self.merger.work(source1, source2, fuel) }
fn done(self) -> Rc<B> { Rc::new(self.merger.done()) }