mz_compute/sink/refresh.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
11use differential_dataflow::{AsCollection, Data, VecCollection};
12use mz_ore::soft_panic_or_log;
13use mz_repr::refresh_schedule::RefreshSchedule;
14use mz_repr::{Diff, Timestamp};
15use timely::dataflow::Scope;
16use timely::dataflow::channels::pact::Pipeline;
17use timely::dataflow::operators::generic::OutputBuilder;
18use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
19
20/// This is for REFRESH options on materialized views. It adds an operator that rounds up the
21/// timestamps of data and frontiers to the time of the next refresh. See
22/// `doc/developer/design/20231027_refresh_every_mv.md`.
23///
24/// Note that this currently only works with 1-dim timestamps. (This is not an issue for WMR,
25/// because iteration numbers should disappear by the time the data gets to the Persist sink.)
26pub(crate) fn apply_refresh<G, D>(
27 coll: VecCollection<G, D, Diff>,
28 refresh_schedule: RefreshSchedule,
29) -> VecCollection<G, D, Diff>
30where
31 G: Scope<Timestamp = Timestamp>,
32 D: Data,
33{
34 // We need to disconnect the reachability graph and manage capabilities manually, because we'd
35 // like to round up frontiers as well as data: as soon as our input frontier passes a refresh
36 // time, we'll round it up to the next refresh time.
37 let mut builder = OperatorBuilder::new("apply_refresh".to_string(), coll.scope());
38 let (output_buf, output_stream) = builder.new_output();
39 let mut output_buf = OutputBuilder::<_, ConsolidatingContainerBuilder<_>>::from(output_buf);
40
41 let mut input = builder.new_input_connection(&coll.inner, Pipeline, []);
42 builder.build(move |capabilities| {
43 // This capability directly controls this operator's output frontier (because we have
44 // disconnected the input above). We wrap it in an Option so we can drop it to advance to
45 // the empty output frontier when the last refresh is done. (We must be careful that we only
46 // ever emit output updates at times that are at or beyond this capability.)
47 let mut capability = capabilities.into_iter().next(); // (We have 1 one input.)
48 move |frontiers| {
49 let mut output_handle_core = output_buf.activate();
50 input.for_each(|input_cap, data| {
51 // Note that we can't use `input_cap` to get an output session because we might have
52 // advanced our output frontier already beyond the frontier of this capability.
53
54 // `capability` will be None if we are past the last refresh. We have made sure to
55 // not receive any data that is after the last refresh by setting the `until` of the
56 // dataflow to the last refresh.
57 let Some(capability) = capability.as_mut() else {
58 soft_panic_or_log!(
59 "should have a capability if we received data. input_cap: {:?}, frontier: {:?}",
60 input_cap.time(),
61 frontiers[0].frontier()
62 );
63 return;
64 };
65 let mut output_buf = output_handle_core.session_with_builder(&capability);
66
67 let mut cached_ts: Option<Timestamp> = None;
68 let mut cached_rounded_up_data_ts = None;
69 for (d, ts, r) in data.drain(..) {
70 let rounded_up_data_ts = {
71 // We cache the rounded up timestamp for the last seen timestamp,
72 // because the rounding up has a non-negligible cost. Caching for
73 // just the 1 last timestamp helps already, because in some common
74 // cases, we'll be seeing a lot of identical timestamps, e.g.,
75 // during a rehydration, or when we have much more than 1000 records
76 // during a single second.
77 if cached_ts != Some(ts) {
78 cached_ts = Some(ts);
79 cached_rounded_up_data_ts = refresh_schedule.round_up_timestamp(ts);
80 }
81 cached_rounded_up_data_ts
82 };
83 match rounded_up_data_ts {
84 Some(rounded_up_ts) => {
85 output_buf.give((d, rounded_up_ts, r));
86 }
87 None => {
88 // This record is after the last refresh, which is not possible because
89 // we set the dataflow `until` to the last refresh.
90 soft_panic_or_log!("Received data after the last refresh");
91 }
92 }
93 }
94 });
95
96 // Round up the frontier.
97 // Note that `round_up_timestamp` is monotonic. This is needed to ensure that the
98 // timestamp (t) of any received data that has a larger timestamp than the original
99 // frontier (f) will get rounded up to a time that is at least at the rounded up
100 // frontier. In other words, monotonicity ensures that
101 // when `t >= f` then `round_up_timestamp(t) >= round_up_timestamp(f)`.
102 match frontiers[0].frontier().as_option() { // (We have only 1 input, so only 1 frontier.)
103 Some(ts) => {
104 match refresh_schedule.round_up_timestamp(*ts) {
105 Some(rounded_up_ts) => {
106 capability
107 .as_mut()
108 .expect("capability must exist if frontier is <= last refresh")
109 .downgrade(&rounded_up_ts);
110 }
111 None => {
112 // We are past the last refresh. Drop the capability to signal that we
113 // are done.
114 capability = None;
115 // We can only get here if we see the frontier advancing to a time after the last refresh,
116 // but not empty. This is ok, because even though we set the `until` to the last refresh,
117 // frontier advancements might still happen past the `until`.
118 }
119 }
120 }
121 None => {
122 capability = None;
123 }
124 }
125 }
126 });
127
128 output_stream.as_collection()
129}