Skip to main content

mz_interchange/
envelopes.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::iter;
12use std::sync::LazyLock;
13
14use differential_dataflow::trace::implementations::BatchContainer;
15use differential_dataflow::trace::{BatchReader, Cursor};
16use itertools::EitherOrBoth;
17use maplit::btreemap;
18use mz_ore::cast::CastFrom;
19use mz_repr::{
20    CatalogItemId, ColumnName, Datum, Diff, Row, RowPacker, SqlColumnType, SqlScalarType,
21};
22
23use crate::avro::DiffPair;
24
25/// Walks `batch` and invokes `on_diff_pair` for each `DiffPair` at each
26/// `(key, timestamp)`.
27///
28/// Within a key, diffs are partitioned by sign into retractions (befores) and
29/// insertions (afters), sorted by timestamp, and zipped into `DiffPair`s via a
30/// merge-join. Pairs are emitted in ascending timestamp order for a given key;
31/// no ordering is guaranteed across keys. Callers are responsible for tracking
32/// `(key, timestamp)` boundaries themselves if they need to detect groups
33/// with more than one pair (e.g., for primary-key violation checks).
34pub fn for_each_diff_pair<B, F>(batch: &B, mut on_diff_pair: F)
35where
36    B: BatchReader<Diff = Diff>,
37    B::Time: Copy,
38    B::ValOwn: 'static,
39    F: FnMut(&<B::KeyContainer as BatchContainer>::Owned, B::Time, DiffPair<B::ValOwn>),
40{
41    let mut befores: Vec<(B::Time, B::ValOwn, usize)> = vec![];
42    let mut afters: Vec<(B::Time, B::ValOwn, usize)> = vec![];
43
44    let mut cursor = batch.cursor();
45    while cursor.key_valid(batch) {
46        let k = cursor.key(batch);
47
48        // Partition updates at this key into retractions (befores) and
49        // insertions (afters).
50        while cursor.val_valid(batch) {
51            let v = cursor.val(batch);
52            cursor.map_times(batch, |t, diff| {
53                let diff = B::owned_diff(diff);
54                let update = (
55                    B::owned_time(t),
56                    B::owned_val(v),
57                    usize::cast_from(diff.unsigned_abs()),
58                );
59                if diff < Diff::ZERO {
60                    befores.push(update);
61                } else {
62                    afters.push(update);
63                }
64            });
65            cursor.step_val(batch);
66        }
67
68        befores.sort_by_key(|(t, _v, _diff)| *t);
69        afters.sort_by_key(|(t, _v, _diff)| *t);
70
71        // The use of `repeat_n()` here is a bit subtle, but load bearing.
72        // In the common case, cnt = 1, and we want to avoid cloning the value if possible. In the naive
73        // implementation, we might use `iter::repeat((t, v)).take(cnt)`, but that would clone `v` `cnt` times even
74        // when `cnt = 1`. By contrast, `repeat_n((t, v), cnt)` will return the original `(t, v)` when `cnt = 1`,
75        // and only clone when `cnt > 1`.
76        let fan_out = |(t, v, cnt): (B::Time, B::ValOwn, usize)| iter::repeat_n((t, v), cnt);
77        let befores_iter = befores.drain(..).flat_map(fan_out);
78        let afters_iter = afters.drain(..).flat_map(fan_out);
79
80        let key_owned = <B::KeyContainer as BatchContainer>::into_owned(k);
81
82        for pair in
83            itertools::merge_join_by(befores_iter, afters_iter, |(t1, _v1), (t2, _v2)| t1.cmp(t2))
84        {
85            let (t, before, after) = match pair {
86                EitherOrBoth::Both((t, before), (_t, after)) => (t, Some(before), Some(after)),
87                EitherOrBoth::Left((t, before)) => (t, Some(before), None),
88                EitherOrBoth::Right((t, after)) => (t, None, Some(after)),
89            };
90            on_diff_pair(&key_owned, t, DiffPair { before, after });
91        }
92
93        cursor.step_key(batch);
94    }
95}
96
97// NOTE(benesch): statically allocating transient IDs for the
98// transaction and row types is a bit of a hack to allow us to attach
99// custom names to these types in the generated Avro schema. In the
100// future, these types should be real types that get created in the
101// catalog with userspace IDs when the user creates the sink, and their
102// names and IDs should be plumbed in from the catalog at the moment
103// the sink is created.
104pub(crate) const TRANSACTION_TYPE_ID: CatalogItemId = CatalogItemId::Transient(1);
105pub(crate) const DBZ_ROW_TYPE_ID: CatalogItemId = CatalogItemId::Transient(2);
106
107pub static ENVELOPE_CUSTOM_NAMES: LazyLock<BTreeMap<CatalogItemId, String>> = LazyLock::new(|| {
108    btreemap! {
109        TRANSACTION_TYPE_ID => "transaction".into(),
110        DBZ_ROW_TYPE_ID => "row".into(),
111    }
112});
113
114pub(crate) fn dbz_envelope(
115    names_and_types: Vec<(ColumnName, SqlColumnType)>,
116) -> Vec<(ColumnName, SqlColumnType)> {
117    let row = SqlColumnType {
118        nullable: true,
119        scalar_type: SqlScalarType::Record {
120            fields: names_and_types.into(),
121            custom_id: Some(DBZ_ROW_TYPE_ID),
122        },
123    };
124    vec![("before".into(), row.clone()), ("after".into(), row)]
125}
126
127pub fn dbz_format(rp: &mut RowPacker, dp: DiffPair<Row>) {
128    if let Some(before) = dp.before {
129        rp.push_list_with(|rp| rp.extend_by_row(&before));
130    } else {
131        rp.push(Datum::Null);
132    }
133    if let Some(after) = dp.after {
134        rp.push_list_with(|rp| rp.extend_by_row(&after));
135    } else {
136        rp.push(Datum::Null);
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use differential_dataflow::trace::Batcher;
143    use differential_dataflow::trace::implementations::{ValBatcher, ValBuilder};
144    use timely::progress::Antichain;
145
146    use super::*;
147
148    /// Seals a single batch from an unordered list of `((key, val), time, diff)`
149    /// tuples upper-bounded at `upper`.
150    fn batch_from_tuples(
151        mut tuples: Vec<((String, String), u64, Diff)>,
152        upper: u64,
153    ) -> <ValBuilder<String, String, u64, Diff> as differential_dataflow::trace::Builder>::Output
154    {
155        let mut batcher = ValBatcher::<String, String, u64, Diff>::new(None, 0);
156        batcher.push_container(&mut tuples);
157        batcher.seal::<ValBuilder<String, String, u64, Diff>>(Antichain::from_elem(upper))
158    }
159
160    /// Collects `for_each_diff_pair` invocations into a flat, deterministically
161    /// sorted list for easy assertion.
162    fn collect_diff_pairs<B>(batch: &B) -> Vec<(String, u64, Option<String>, Option<String>)>
163    where
164        B: BatchReader<Diff = Diff>,
165        B::Time: Copy + Into<u64>,
166        B::ValOwn: 'static + Into<String>,
167        <B::KeyContainer as BatchContainer>::Owned: Into<String> + Clone,
168    {
169        let mut out = vec![];
170        for_each_diff_pair(batch, |k, t, dp| {
171            out.push((
172                k.clone().into(),
173                t.into(),
174                dp.before.map(Into::into),
175                dp.after.map(Into::into),
176            ));
177        });
178        out.sort();
179        out
180    }
181
182    #[mz_ore::test]
183    fn single_insertion() {
184        let batch = batch_from_tuples(vec![(("k1".into(), "v1".into()), 5, Diff::ONE)], 6);
185        let pairs = collect_diff_pairs(&batch);
186        assert_eq!(pairs, vec![("k1".into(), 5, None, Some("v1".into()))]);
187    }
188
189    #[mz_ore::test]
190    fn single_retraction() {
191        let batch = batch_from_tuples(vec![(("k1".into(), "v1".into()), 5, -Diff::ONE)], 6);
192        let pairs = collect_diff_pairs(&batch);
193        assert_eq!(pairs, vec![("k1".into(), 5, Some("v1".into()), None)]);
194    }
195
196    #[mz_ore::test]
197    fn update_at_same_timestamp() {
198        // Retract v1 and insert v2 at the same timestamp → paired into a single
199        // DiffPair with both before and after populated.
200        let batch = batch_from_tuples(
201            vec![
202                (("k1".into(), "v1".into()), 5, -Diff::ONE),
203                (("k1".into(), "v2".into()), 5, Diff::ONE),
204            ],
205            6,
206        );
207        let pairs = collect_diff_pairs(&batch);
208        assert_eq!(
209            pairs,
210            vec![("k1".into(), 5, Some("v1".into()), Some("v2".into()))]
211        );
212    }
213
214    #[mz_ore::test]
215    fn update_across_timestamps() {
216        // Insert v1 at t=5, then replace with v2 at t=10.
217        let batch = batch_from_tuples(
218            vec![
219                (("k1".into(), "v1".into()), 5, Diff::ONE),
220                (("k1".into(), "v1".into()), 10, -Diff::ONE),
221                (("k1".into(), "v2".into()), 10, Diff::ONE),
222            ],
223            11,
224        );
225        let pairs = collect_diff_pairs(&batch);
226        assert_eq!(
227            pairs,
228            vec![
229                ("k1".into(), 5, None, Some("v1".into())),
230                ("k1".into(), 10, Some("v1".into()), Some("v2".into())),
231            ]
232        );
233    }
234
235    #[mz_ore::test]
236    fn diff_greater_than_one_fans_out() {
237        // Diff=3 becomes three independent `DiffPair`s at the same timestamp.
238        let batch = batch_from_tuples(vec![(("k1".into(), "v1".into()), 5, Diff::from(3))], 6);
239        let pairs = collect_diff_pairs(&batch);
240        assert_eq!(
241            pairs,
242            vec![
243                ("k1".into(), 5, None, Some("v1".into())),
244                ("k1".into(), 5, None, Some("v1".into())),
245                ("k1".into(), 5, None, Some("v1".into())),
246            ]
247        );
248    }
249
250    #[mz_ore::test]
251    fn multiple_keys_are_independent() {
252        let batch = batch_from_tuples(
253            vec![
254                (("k1".into(), "v1".into()), 5, Diff::ONE),
255                (("k2".into(), "v2".into()), 5, Diff::ONE),
256            ],
257            6,
258        );
259        let pairs = collect_diff_pairs(&batch);
260        assert_eq!(
261            pairs,
262            vec![
263                ("k1".into(), 5, None, Some("v1".into())),
264                ("k2".into(), 5, None, Some("v2".into())),
265            ]
266        );
267    }
268
269    #[mz_ore::test]
270    fn unpaired_before_and_after_at_different_timestamps() {
271        // Retraction at t=5, insertion at t=10 — they do NOT pair because they
272        // live at different timestamps.
273        let batch = batch_from_tuples(
274            vec![
275                (("k1".into(), "v1".into()), 5, -Diff::ONE),
276                (("k1".into(), "v2".into()), 10, Diff::ONE),
277            ],
278            11,
279        );
280        let pairs = collect_diff_pairs(&batch);
281        assert_eq!(
282            pairs,
283            vec![
284                ("k1".into(), 5, Some("v1".into()), None),
285                ("k1".into(), 10, None, Some("v2".into())),
286            ]
287        );
288    }
289}