Module differential_dataflow::operators::arrange::upsert

source ·
Expand description

Support for forming collections from streams of upsert.

Upserts are sequences of keyed optional values, and they define a collection of the pairs of keys and each’s most recent value, if it is present. Element in the sequence effectively overwrites the previous value at the key, if present, and if the value is not present it uninstalls the key.

Upserts are non-trivial because they do not themselves describe the deletions that the Collection update stream must present. However, if one creates an Arrangement then this state provides sufficient information. The arrangement will continue to exist even if dropped until the input or dataflow shuts down, as the upsert operator itself needs access to its accumulated state.

§Notes

Upserts currently only work with totally ordered timestamps.

In the case of ties in timestamps (concurrent updates to the same key) they choose the greatest value according to Option<Val> ordering, which will prefer a value to None and choose the greatest value (informally, as if applied in order of value).

If the same value is repeated, no change will occur in the output. That may make this operator effective at determining the difference between collections of keyed values, but note that it will not notice the absence of keys in a collection.

To effect “filtering” in a way that reduces the arrangement footprint, apply a map to the input stream, mapping values that fail the predicate to None values, like so:

// Dropped values should be retained as "uninstall" upserts.
upserts.map(|(key,opt_val)| (key, opt_val.filter(predicate)))

§Example

// define a new timely dataflow computation.
timely::execute_from_args(std::env::args().skip(1), move |worker| {

    type Key = String;
    type Val = String;

    let mut input = timely::dataflow::InputHandle::new();
    let mut probe = timely::dataflow::ProbeHandle::new();

    // Create a dataflow demonstrating upserts.
    //
    // Upserts are a sequence of records (key, option<val>) where the intended
    // value associated with a key is the most recent value, and if that is a
    // `none` then the key is removed (until a new value shows up).
    //
    // The challenge with upserts is that the value to *retract* isn't supplied
    // as part of the input stream. We have to determine what it should be!

    worker.dataflow(|scope| {

        use timely::dataflow::operators::Input;
        use differential_dataflow::trace::implementations::ValSpine;
        use differential_dataflow::operators::arrange::upsert;

        let stream = scope.input_from(&mut input);
        let arranged = upsert::arrange_from_upsert::<_, ValSpine<Key, Val, _, _>>(&stream, &"test");

        arranged
            .as_collection(|k,v| (k.clone(), v.clone()))
            .inspect(|x| println!("Observed: {:?}", x))
            .probe_with(&mut probe);
    });

    // Introduce the key, with a specific value.
    input.send(("frank".to_string(), Some("mcsherry".to_string()), 3));
    input.advance_to(4);
    while probe.less_than(input.time()) { worker.step(); }

    // Change the value to a different value.
    input.send(("frank".to_string(), Some("zappa".to_string()), 4));
    input.advance_to(5);
    while probe.less_than(input.time()) { worker.step(); }

    // Remove the key and its value.
    input.send(("frank".to_string(), None, 5));
    input.advance_to(9);
    while probe.less_than(input.time()) { worker.step(); }

    // Introduce a new totally different value
    input.send(("frank".to_string(), Some("oz".to_string()), 9));
    input.advance_to(10);
    while probe.less_than(input.time()) { worker.step(); }

    // Repeat the value, which should produce no output.
    input.send(("frank".to_string(), Some("oz".to_string()), 11));
    input.advance_to(12);
    while probe.less_than(input.time()) { worker.step(); }
    // Remove the key and value.
    input.send(("frank".to_string(), None, 15));
    input.close();

}).unwrap();

Functions§