1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! A Timely Dataflow operator that emits the records in a snapshot.

use persist_types::Codec;
use timely::dataflow::operators::generic::operator;
use timely::dataflow::operators::Map;
use timely::dataflow::{Scope, Stream};
use timely::progress::Antichain;
use timely::{Data as TimelyData, PartialOrder};

use crate::client::DecodedSnapshot;
use crate::error::Error;
use crate::indexed::Snapshot;

/// Extension trait for [`Stream`].
pub trait Replay<G: Scope<Timestamp = u64>, K: TimelyData, V: TimelyData> {
    /// Emits each record in a snapshot.
    fn replay(
        &self,
        snapshot: Result<DecodedSnapshot<K, V>, Error>,
        as_of_frontier: &Antichain<u64>,
    ) -> Stream<G, (Result<(K, V), String>, u64, isize)>;
}

impl<G, K, V> Replay<G, K, V> for G
where
    G: Scope<Timestamp = u64>,
    K: TimelyData + Codec,
    V: TimelyData + Codec,
{
    fn replay(
        &self,
        snapshot: Result<DecodedSnapshot<K, V>, Error>,
        as_of_frontier: &Antichain<u64>,
    ) -> Stream<G, (Result<(K, V), String>, u64, isize)> {
        // TODO: This currently works by only emitting the persisted
        // data on worker 0 because that was the simplest thing to do
        // initially. Instead, we should shard up the responsibility
        // between all the workers.
        let active_worker = self.index() == 0;

        let as_of_frontier = as_of_frontier.clone();

        let result_stream: Stream<G, Result<((K, V), u64, isize), Error>> = operator::source(
            self,
            "Replay",
            move |cap, _info| {
                let mut snapshot_cap = if active_worker {
                    Some((snapshot, cap))
                } else {
                    None
                };

                move |output| {
                    let (snapshot, cap) = match snapshot_cap.take() {
                        Some(x) => x,
                        None => return, // We were already invoked and consumed our snapshot.
                    };

                    let mut session = output.session(&cap);

                    match snapshot {
                        Ok(snapshot) => {
                            let snapshot_since = snapshot.since();

                            if PartialOrder::less_than(&as_of_frontier, &snapshot_since) {
                                session.give(Err(Error::String(format!(
                                    "replaying persisted data: snapshot since ({:?}) is beyond expected as_of ({:?})",
                                    snapshot_since, as_of_frontier
                                ))));

                                return;
                            }

                            // TODO: Periodically yield to let the rest of the dataflow
                            // reduce this down.
                            for x in snapshot.into_iter() {
                                if let Ok((_, ts, _)) = &x {
                                    // The raw update data held internally in the
                                    // snapshot may not be physically compacted up to
                                    // the logical compaction frontier of since.
                                    // Snapshot handles advancing any necessary data but
                                    // we double check that invariant here.
                                    debug_assert!(snapshot_since.less_equal(ts));
                                }
                                session.give(x);
                            }
                        }
                        Err(e) => {
                            session.give(Err(Error::String(format!(
                                "replaying persisted data: {}",
                                e
                            ))));
                        }
                    }
                }
            },
        );
        result_stream.map(|x| {
            match x {
                Ok((kv, ts, diff)) => (Ok(kv), ts, diff),
                Err(err) => {
                    // TODO: Make the responsibility for retries in the presence
                    // of transient storage failures lie with the snapshot (with
                    // appropriate monitoring). At the limit, we should be able
                    // to retry even the compaction+deletion of a batch that we
                    // were supposed to fetch by grabbing the current version of
                    // META. However, note that there is a case where we well
                    // and truly have to give up: when the compaction frontier
                    // has advanced past the ts this snapshot is reading at.
                    //
                    // TODO: Figure out a meaningful timestamp to use here? As
                    // mentioned above, this error will eventually represent
                    // something totally unrecoverable, so it should probably go
                    // to the system errors (once that's built) instead of this
                    // err_stream. Aljoscha suggests that system errors likely
                    // won't have a timestamp in the source domain
                    // (https://github.com/MaterializeInc/materialize/pull/8212#issuecomment-915877541),
                    // so perhaps this problem just goes away at some point. In
                    // the meantime, we never downgrade capabilities on the
                    // err_stream until the operator is finished, so it
                    // technically works to emit it at ts=0 for now.
                    (Err(err.to_string()), 0, 1)
                }
            }
        })
    }
}

#[cfg(test)]
mod tests {

    use timely::dataflow::operators::capture::Extract;
    use timely::dataflow::operators::{Capture, OkErr};
    use timely::progress::Antichain;

    use crate::error::Error;
    use crate::mem::MemRegistry;
    use crate::operators::split_ok_err;

    use super::*;

    #[test]
    fn compaction_beyond_seal() -> Result<(), Error> {
        let mut registry = MemRegistry::new();
        let p = registry.runtime_no_reentrance()?;

        let (write, _) = p.create_or_load::<String, String>("1");

        write
            .write(&[
                (("k1".into(), "v1".into()), 1, 1),
                (("k2".into(), "v2".into()), 2, 1),
            ])
            .recv()?;

        write.seal(3).recv().expect("seal was successful");
        write.allow_compaction(Antichain::from_elem(10)).recv()?;

        let (oks, errs) = timely::execute_directly(move |worker| {
            let (oks, errs) = worker.dataflow(|scope| {
                let (_write, read) = p.create_or_load::<String, String>("1");
                let snapshot = read.snapshot();
                let (ok_stream, err_stream) = scope
                    .replay(snapshot, &Antichain::from_elem(10))
                    .ok_err(split_ok_err);
                (ok_stream.capture(), err_stream.capture())
            });

            (oks, errs)
        });

        assert_eq!(
            errs.extract()
                .into_iter()
                .flat_map(|(_time, data)| data.into_iter().map(|(err, _ts, _diff)| err))
                .collect::<Vec<_>>(),
            Vec::<String>::new()
        );

        let mut actual = oks
            .extract()
            .into_iter()
            .flat_map(|(_, xs)| xs.into_iter())
            .collect::<Vec<_>>();

        let mut expected = vec![
            (("k1".into(), "v1".into()), 10, 1),
            (("k2".into(), "v2".into()), 10, 1),
        ];
        actual.sort();
        expected.sort();

        assert_eq!(actual, expected);

        Ok(())
    }

    #[test]
    fn replay() -> Result<(), Error> {
        let mut registry = MemRegistry::new();
        let p = registry.runtime_no_reentrance()?;

        let (write, _) = p.create_or_load("1");
        for i in 1..=5 {
            write
                .write(&[((i.to_string(), ()), i, 1)])
                .recv()
                .expect("write was successful");
        }
        write.seal(6).recv().expect("seal was successful");

        let (oks, errs) = timely::execute_directly(move |worker| {
            let (oks, errs) = worker.dataflow(|scope| {
                let (_write, read) = p.create_or_load::<String, ()>("1");
                let (ok_stream, err_stream) = scope
                    .replay(read.snapshot(), &Antichain::from_elem(0))
                    .ok_err(split_ok_err);
                (ok_stream.capture(), err_stream.capture())
            });

            (oks, errs)
        });

        assert!(errs
            .extract()
            .into_iter()
            .flat_map(|(_time, data)| data.into_iter().map(|(err, _ts, _diff)| err))
            .collect::<Vec<_>>()
            .is_empty());

        let mut actual = oks
            .extract()
            .into_iter()
            .flat_map(|(_, xs)| xs.into_iter().map(|((k, _), _, _)| k))
            .collect::<Vec<_>>();
        actual.sort();

        let expected = (1usize..=5usize).map(|x| x.to_string()).collect::<Vec<_>>();
        assert_eq!(actual, expected);

        Ok(())
    }

    #[test]
    fn replay_invalid_as_of() -> Result<(), Error> {
        let mut registry = MemRegistry::new();
        let p = registry.runtime_no_reentrance()?;

        let (write, _) = p.create_or_load("1");
        for i in 1..=5 {
            write
                .write(&[((i.to_string(), ()), i, 1)])
                .recv()
                .expect("write was successful");
        }
        write.seal(6).recv().expect("seal was successful");
        write
            .allow_compaction(Antichain::from_elem(5))
            .recv()
            .expect("compaction was successful");

        let (oks, errs) = timely::execute_directly(move |worker| {
            let (oks, errs) = worker.dataflow(|scope| {
                let (_write, read) = p.create_or_load::<String, ()>("1");
                let (ok_stream, err_stream) = scope
                    .replay(read.snapshot(), &Antichain::from_elem(0))
                    .ok_err(split_ok_err);
                (ok_stream.capture(), err_stream.capture())
            });

            (oks, errs)
        });

        assert!(oks
            .extract()
            .into_iter()
            .flat_map(|(_time, data)| data.into_iter().map(|(data, _ts, _diff)| data))
            .collect::<Vec<_>>()
            .is_empty());

        let errs = errs
            .extract()
            .into_iter()
            .flat_map(|(_time, data)| data.into_iter().map(|(err, _ts, _diff)| err))
            .collect::<Vec<_>>();

        assert_eq!(
            errs,
            vec!["replaying persisted data: snapshot since (Antichain { elements: [5] }) is beyond expected as_of (Antichain { elements: [0] })".to_string()]);

        Ok(())
    }
}