1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use persist::client::{StreamReadHandle, StreamWriteHandle};
use timely::dataflow::operators::{Concat, Map, OkErr};
use timely::dataflow::{Scope, Stream};
use timely::progress::Antichain;
use dataflow_types::{DataflowError, DecodeError, SourceError, SourceErrorDetails};
use persist::operators::replay::Replay;
use persist::operators::stream::{Persist, RetractUnsealed};
use persist_types::Codec;
use repr::{Diff, Row, Timestamp};
#[derive(Debug, Clone)]
pub struct PersistentEnvelopeNoneConfig<V: Codec> {
pub upper_seal_ts: u64,
pub read_handle: StreamReadHandle<V, ()>,
pub write_handle: StreamWriteHandle<V, ()>,
}
impl<V: Codec> PersistentEnvelopeNoneConfig<V> {
pub fn new(
upper_seal_ts: u64,
read_handle: StreamReadHandle<V, ()>,
write_handle: StreamWriteHandle<V, ()>,
) -> Self {
PersistentEnvelopeNoneConfig {
upper_seal_ts,
read_handle,
write_handle,
}
}
}
pub(crate) fn persist_and_replay<G>(
source_name: &str,
stream: &Stream<G, (Result<Row, DecodeError>, Timestamp, Diff)>,
as_of_frontier: &Antichain<Timestamp>,
persist_config: PersistentEnvelopeNoneConfig<Result<Row, DecodeError>>,
) -> (
Stream<G, (Result<Row, DecodeError>, Timestamp, Diff)>,
Stream<G, (DataflowError, Timestamp, Diff)>,
)
where
G: Scope<Timestamp = Timestamp>,
{
let scope = stream.scope();
let (restored_oks, restored_errs) = {
let snapshot = persist_config.read_handle.snapshot();
let (restored_oks, restored_errs) =
scope.replay(snapshot, as_of_frontier).ok_err(split_ok_err);
let (restored_oks, retract_errs) = restored_oks.retract_unsealed(
source_name,
persist_config.write_handle.clone(),
persist_config.upper_seal_ts,
);
let combined_errs = restored_errs.concat(&retract_errs);
(restored_oks, combined_errs)
};
let flattened_stream = stream.map(|(row, ts, diff)| ((row, ()), ts, diff));
let (flattened_stream, persist_errs) =
flattened_stream.persist(source_name, persist_config.write_handle);
let persist_errs = persist_errs.concat(&restored_errs);
let source_name = source_name.to_string();
let persist_errs = persist_errs.map(move |(err, ts, diff)| {
let source_error =
SourceError::new(source_name.clone(), SourceErrorDetails::Persistence(err));
(source_error.into(), ts, diff)
});
let combined_stream = flattened_stream.concat(&restored_oks);
let combined_stream = combined_stream.map(|((k, ()), ts, diff)| (k, ts, diff));
(combined_stream, persist_errs)
}
fn split_ok_err<K, V>(
x: (Result<(K, V), String>, u64, isize),
) -> Result<((K, V), u64, isize), (String, u64, isize)> {
match x {
(Ok(kv), ts, diff) => Ok((kv, ts, diff)),
(Err(err), ts, diff) => Err((err, ts, diff)),
}
}