1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use persist::client::{StreamReadHandle, StreamWriteHandle};
use timely::dataflow::operators::{Concat, Map, OkErr};
use timely::dataflow::{Scope, Stream};
use timely::progress::Antichain;

use dataflow_types::{DataflowError, DecodeError, SourceError, SourceErrorDetails};
use persist::operators::replay::Replay;
use persist::operators::stream::{Persist, RetractUnsealed};
use persist_types::Codec;
use repr::{Diff, Row, Timestamp};

/// Persist configuration for `ENVELOPE NONE` sources.
#[derive(Debug, Clone)]
pub struct PersistentEnvelopeNoneConfig<V: Codec> {
    /// The timestamp up to which which data should be read when restoring.
    pub upper_seal_ts: u64,

    /// [`StreamReadHandle`] for the collection that we should persist to.
    pub read_handle: StreamReadHandle<V, ()>,

    /// [`StreamWriteHandle`] for the collection that we should persist to.
    pub write_handle: StreamWriteHandle<V, ()>,
}

impl<V: Codec> PersistentEnvelopeNoneConfig<V> {
    /// Creates a new [`PersistentEnvelopeNoneConfig`] from the given parts.
    pub fn new(
        upper_seal_ts: u64,
        read_handle: StreamReadHandle<V, ()>,
        write_handle: StreamWriteHandle<V, ()>,
    ) -> Self {
        PersistentEnvelopeNoneConfig {
            upper_seal_ts,
            read_handle,
            write_handle,
        }
    }
}

/// Persists the given input stream, passes through the data it carries and replays previously
/// persisted updates when starting up.
///
/// This will filter out and retract replayed updates that are not beyond the `upper_seal_ts` given
/// in `persist_config`.
///
pub(crate) fn persist_and_replay<G>(
    source_name: &str,
    stream: &Stream<G, (Result<Row, DecodeError>, Timestamp, Diff)>,
    as_of_frontier: &Antichain<Timestamp>,
    persist_config: PersistentEnvelopeNoneConfig<Result<Row, DecodeError>>,
) -> (
    Stream<G, (Result<Row, DecodeError>, Timestamp, Diff)>,
    Stream<G, (DataflowError, Timestamp, Diff)>,
)
where
    G: Scope<Timestamp = Timestamp>,
{
    let scope = stream.scope();

    let (restored_oks, restored_errs) = {
        let snapshot = persist_config.read_handle.snapshot();

        let (restored_oks, restored_errs) =
            scope.replay(snapshot, as_of_frontier).ok_err(split_ok_err);

        let (restored_oks, retract_errs) = restored_oks.retract_unsealed(
            source_name,
            persist_config.write_handle.clone(),
            persist_config.upper_seal_ts,
        );

        let combined_errs = restored_errs.concat(&retract_errs);

        (restored_oks, combined_errs)
    };

    // Persist can only deal with (key, value) streams, so promote value to key.
    let flattened_stream = stream.map(|(row, ts, diff)| ((row, ()), ts, diff));

    let (flattened_stream, persist_errs) =
        flattened_stream.persist(source_name, persist_config.write_handle);

    let persist_errs = persist_errs.concat(&restored_errs);

    let source_name = source_name.to_string();
    let persist_errs = persist_errs.map(move |(err, ts, diff)| {
        let source_error =
            SourceError::new(source_name.clone(), SourceErrorDetails::Persistence(err));
        (source_error.into(), ts, diff)
    });

    let combined_stream = flattened_stream.concat(&restored_oks);

    // Strip away the key/value separation that we needed for persistence.
    let combined_stream = combined_stream.map(|((k, ()), ts, diff)| (k, ts, diff));

    (combined_stream, persist_errs)
}

// TODO: Maybe we should finally move this to some central place and re-use. There seem to be
// enough instances of this by now.
fn split_ok_err<K, V>(
    x: (Result<(K, V), String>, u64, isize),
) -> Result<((K, V), u64, isize), (String, u64, isize)> {
    match x {
        (Ok(kv), ts, diff) => Ok((kv, ts, diff)),
        (Err(err), ts, diff) => Err((err, ts, diff)),
    }
}