Trait persist::operators::upsert::PersistentUpsert[][src]

pub trait PersistentUpsert<G, K: Codec, V: Codec, T> {
    fn persistent_upsert(
        &self,
        name: &str,
        as_of_frontier: Antichain<u64>,
        persist_config: PersistentUpsertConfig<K, V>
    ) -> (Stream<G, ((K, V), u64, isize)>, Stream<G, (String, u64, isize)>)
    where
        G: Scope<Timestamp = u64>
; }
Expand description

Extension trait for Stream.

Required methods

Turns a stream of keyed upserts into a stream of differential updates and also persists upserts to the StreamWriteHandle given in persist_config.

The first output stream is the stream of differential updates while the second output stream contains persistence errors.

The input is a stream of (Key, Option<Val>, timestamp) tuples, where “timestamp” expresses a happens-before relationship and could, for example, be a Kafka offset. The contents of the collection are defined key-by-key, where each optional value in sequence either replaces or removes the existing value, should it exist.

The as_of_frontier indicates a frontier that can be used to compact input timestamps without affecting the results. We should apply it, both because it improves performance, and because potentially incorrect results are visible in sinks.

This method is only implemented for totally ordered times, as we do not yet understand what a “sequence” of upserts would mean for partially ordered timestamps.

There are important invariants that this method will maintain for the persisted collection pointed to by persist_config. Any other actor that interacts with it must also ensure them. The invariants only apply to consolidated data, that is when all diffs are summed up for a given key/value pair. The invariants are:

  • Each update in the collection must have a diff of 1 or 0. That is an update either exists exactly once or it doesn’t exist.

  • For each key, there can only be one value that it maps to. That is, keys must be unique.

Note: This does only persist upserts but not seal them. Use together with seal() to also seal the persistent collection.

Implementations on Foreign Types

Implementors