1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
use differential_dataflow::{AsCollection, Collection, Data};
use mz_expr::refresh_schedule::RefreshSchedule;
use mz_ore::soft_panic_or_log;
use mz_repr::{Diff, Timestamp};
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::Scope;
use timely::progress::Antichain;

/// This is for REFRESH options on materialized views. It adds an operator that rounds up the
/// timestamps of data and frontiers to the time of the next refresh. See
/// `doc/developer/design/20231027_refresh_every_mv.md`.
///
/// Note that this currently only works with 1-dim timestamps. (This is not an issue for WMR,
/// because iteration numbers should disappear by the time the data gets to the Persist sink.)
pub(crate) fn apply_refresh<G, D>(
    coll: Collection<G, D, Diff>,
    refresh_schedule: RefreshSchedule,
) -> Collection<G, D, Diff>
where
    G: Scope<Timestamp = Timestamp>,
    D: Data,
{
    // We need to disconnect the reachability graph and manage capabilities manually, because we'd
    // like to round up frontiers as well as data: as soon as our input frontier passes a refresh
    // time, we'll round it up to the next refresh time.
    let mut builder = OperatorBuilder::new("apply_refresh".to_string(), coll.scope());
    let (mut output_buf, output_stream) = builder.new_output::<ConsolidatingContainerBuilder<_>>();
    let mut input = builder.new_input_connection(&coll.inner, Pipeline, vec![Antichain::new()]);
    builder.build(move |capabilities| {
        // This capability directly controls this operator's output frontier (because we have
        // disconnected the input above). We wrap it in an Option so we can drop it to advance to
        // the empty output frontier when the last refresh is done. (We must be careful that we only
        // ever emit output updates at times that are at or beyond this capability.)
        let mut capability = capabilities.into_iter().next(); // (We have 1 one input.)
        let mut buffer = Vec::new();
        move |frontiers| {
            let mut output_handle_core = output_buf.activate();
            input.for_each(|input_cap, data| {
                // Note that we can't use `input_cap` to get an output session because we might have
                // advanced our output frontier already beyond the frontier of this capability.

                // `capability` will be None if we are past the last refresh. We have made sure to
                // not receive any data that is after the last refresh by setting the `until` of the
                // dataflow to the last refresh.
                let Some(capability) = capability.as_mut() else {
                    soft_panic_or_log!(
                        "should have a capability if we received data. input_cap: {:?}, frontier: {:?}",
                        input_cap.time(),
                        frontiers[0].frontier()
                    );
                    return;
                };
                let mut output_buf = output_handle_core.session_with_builder(&capability);

                data.swap(&mut buffer);
                let mut cached_ts: Option<Timestamp> = None;
                let mut cached_rounded_up_data_ts = None;
                for (d, ts, r) in buffer.drain(..) {
                    let rounded_up_data_ts = {
                        // We cache the rounded up timestamp for the last seen timestamp,
                        // because the rounding up has a non-negligible cost. Caching for
                        // just the 1 last timestamp helps already, because in some common
                        // cases, we'll be seeing a lot of identical timestamps, e.g.,
                        // during a rehydration, or when we have much more than 1000 records
                        // during a single second.
                        if cached_ts != Some(ts) {
                            cached_ts = Some(ts);
                            cached_rounded_up_data_ts = refresh_schedule.round_up_timestamp(ts);
                        }
                        cached_rounded_up_data_ts
                    };
                    match rounded_up_data_ts {
                        Some(rounded_up_ts) => {
                            output_buf.give((d, rounded_up_ts, r));
                        }
                        None => {
                            // This record is after the last refresh, which is not possible because
                            // we set the dataflow `until` to the last refresh.
                            soft_panic_or_log!("Received data after the last refresh");
                        }
                    }
                }
            });

            // Round up the frontier.
            // Note that `round_up_timestamp` is monotonic. This is needed to ensure that the
            // timestamp (t) of any received data that has a larger timestamp than the original
            // frontier (f) will get rounded up to a time that is at least at the rounded up
            // frontier. In other words, monotonicity ensures that
            // when `t >= f` then `round_up_timestamp(t) >= round_up_timestamp(f)`.
            match frontiers[0].frontier().as_option() { // (We have only 1 input, so only 1 frontier.)
                Some(ts) => {
                    match refresh_schedule.round_up_timestamp(*ts) {
                        Some(rounded_up_ts) => {
                            capability
                                .as_mut()
                                .expect("capability must exist if frontier is <= last refresh")
                                .downgrade(&rounded_up_ts);
                        }
                        None => {
                            // We are past the last refresh. Drop the capability to signal that we
                            // are done.
                            capability = None;
                            // We can only get here if we see the frontier advancing to a time after the last refresh,
                            // but not empty. This is ok, because even though we set the `until` to the last refresh,
                            // frontier advancements might still happen past the `until`.
                        }
                    }
                }
                None => {
                    capability = None;
                }
            }
        }
    });

    output_stream.as_collection()
}