Skip to main content

mz_interchange/
envelopes.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::iter;
12use std::sync::LazyLock;
13
14use differential_dataflow::trace::implementations::BatchContainer;
15use differential_dataflow::trace::{BatchReader, Cursor};
16use itertools::EitherOrBoth;
17use maplit::btreemap;
18use mz_ore::cast::CastFrom;
19use mz_repr::{
20    CatalogItemId, ColumnName, Datum, Diff, Row, RowPacker, SqlColumnType, SqlScalarType,
21};
22
23use crate::avro::DiffPair;
24
25/// Walks `batch` and invokes `on_diff_pair` for each `DiffPair` at each
26/// `(key, timestamp)`.
27///
28/// Within a key, diffs are partitioned by sign into retractions (befores) and
29/// insertions (afters), sorted by timestamp, and zipped into `DiffPair`s via a
30/// merge-join. Pairs are emitted in ascending timestamp order for a given key;
31/// no ordering is guaranteed across keys. Callers are responsible for tracking
32/// `(key, timestamp)` boundaries themselves if they need to detect groups
33/// with more than one pair (e.g., for primary-key violation checks).
34pub fn for_each_diff_pair<B, F>(batch: &B, mut on_diff_pair: F)
35where
36    B: BatchReader<Diff = Diff>,
37    B::Time: Copy,
38    B::ValOwn: 'static,
39    F: FnMut(&<B::KeyContainer as BatchContainer>::Owned, B::Time, DiffPair<B::ValOwn>),
40{
41    let mut befores: Vec<(B::Time, B::ValOwn, usize)> = vec![];
42    let mut afters: Vec<(B::Time, B::ValOwn, usize)> = vec![];
43
44    let mut cursor = batch.cursor();
45    while cursor.key_valid(batch) {
46        let k = cursor.key(batch);
47
48        // Partition updates at this key into retractions (befores) and
49        // insertions (afters).
50        while cursor.val_valid(batch) {
51            let v = cursor.val(batch);
52            cursor.map_times(batch, |t, diff| {
53                let diff = B::owned_diff(diff);
54                let update = (
55                    B::owned_time(t),
56                    B::owned_val(v),
57                    usize::cast_from(diff.unsigned_abs()),
58                );
59                if diff < Diff::ZERO {
60                    befores.push(update);
61                } else {
62                    afters.push(update);
63                }
64            });
65            cursor.step_val(batch);
66        }
67
68        befores.sort_by_key(|(t, _v, _diff)| *t);
69        afters.sort_by_key(|(t, _v, _diff)| *t);
70
71        // The use of `repeat_n()` here is a bit subtle, but load bearing.
72        // In the common case, cnt = 1, and we want to avoid cloning the value if possible. In the naive
73        // implementation, we might use `iter::repeat((t, v)).take(cnt)`, but that would clone `v` `cnt` times even
74        // when `cnt = 1`. By contrast, `repeat_n((t, v), cnt)` will return the original `(t, v)` when `cnt = 1`,
75        // and only clone when `cnt > 1`.
76        let fan_out = |(t, v, cnt): (B::Time, B::ValOwn, usize)| iter::repeat_n((t, v), cnt);
77        let befores_iter = befores.drain(..).flat_map(fan_out);
78        let afters_iter = afters.drain(..).flat_map(fan_out);
79
80        let key_owned = <B::KeyContainer as BatchContainer>::into_owned(k);
81
82        for pair in
83            itertools::merge_join_by(befores_iter, afters_iter, |(t1, _v1), (t2, _v2)| t1.cmp(t2))
84        {
85            let (t, before, after) = match pair {
86                EitherOrBoth::Both((t, before), (_t, after)) => (t, Some(before), Some(after)),
87                EitherOrBoth::Left((t, before)) => (t, Some(before), None),
88                EitherOrBoth::Right((t, after)) => (t, None, Some(after)),
89            };
90            on_diff_pair(&key_owned, t, DiffPair { before, after });
91        }
92
93        cursor.step_key(batch);
94    }
95}
96
97// NOTE(benesch): statically allocating transient IDs for the
98// transaction and row types is a bit of a hack to allow us to attach
99// custom names to these types in the generated Avro schema. In the
100// future, these types should be real types that get created in the
101// catalog with userspace IDs when the user creates the sink, and their
102// names and IDs should be plumbed in from the catalog at the moment
103// the sink is created.
104pub(crate) const TRANSACTION_TYPE_ID: CatalogItemId = CatalogItemId::Transient(1);
105pub(crate) const DBZ_ROW_TYPE_ID: CatalogItemId = CatalogItemId::Transient(2);
106
107pub static ENVELOPE_CUSTOM_NAMES: LazyLock<BTreeMap<CatalogItemId, String>> = LazyLock::new(|| {
108    btreemap! {
109        TRANSACTION_TYPE_ID => "transaction".into(),
110        DBZ_ROW_TYPE_ID => "row".into(),
111    }
112});
113
114pub(crate) fn dbz_envelope(
115    names_and_types: Vec<(ColumnName, SqlColumnType)>,
116) -> Vec<(ColumnName, SqlColumnType)> {
117    let row = SqlColumnType {
118        nullable: true,
119        scalar_type: SqlScalarType::Record {
120            fields: names_and_types.into(),
121            custom_id: Some(DBZ_ROW_TYPE_ID),
122        },
123    };
124    vec![("before".into(), row.clone()), ("after".into(), row)]
125}
126
127pub fn dbz_format(rp: &mut RowPacker, dp: DiffPair<Row>) {
128    if let Some(before) = dp.before {
129        rp.push_list_with(|rp| rp.extend_by_row(&before));
130    } else {
131        rp.push(Datum::Null);
132    }
133    if let Some(after) = dp.after {
134        rp.push_list_with(|rp| rp.extend_by_row(&after));
135    } else {
136        rp.push(Datum::Null);
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use differential_dataflow::trace::implementations::chunker::ContainerChunker;
143    use differential_dataflow::trace::implementations::{ValBatcher, ValBuilder};
144    use differential_dataflow::trace::{Batcher, Builder};
145    use timely::container::{ContainerBuilder, PushInto};
146    use timely::progress::Antichain;
147
148    use super::*;
149
150    /// Seals a single batch from an unordered list of `((key, val), time, diff)`
151    /// tuples upper-bounded at `upper`.
152    fn batch_from_tuples(
153        mut tuples: Vec<((String, String), u64, Diff)>,
154        upper: u64,
155    ) -> <ValBuilder<String, String, u64, Diff> as Builder>::Output {
156        // The batcher consumes already-chunked input via `PushInto`; chunking
157        // is the caller's responsibility.
158        let mut batcher = ValBatcher::<String, String, u64, Diff>::new(None, 0);
159        let mut chunker = ContainerChunker::<Vec<((String, String), u64, Diff)>>::default();
160        chunker.push_into(&mut tuples);
161        while let Some(chunk) = chunker.extract() {
162            batcher.push_into(std::mem::take(chunk));
163        }
164        while let Some(chunk) = chunker.finish() {
165            batcher.push_into(std::mem::take(chunk));
166        }
167        let (mut chain, description) = batcher.seal(Antichain::from_elem(upper));
168        ValBuilder::<String, String, u64, Diff>::seal(&mut chain, description)
169    }
170
171    /// Collects `for_each_diff_pair` invocations into a flat, deterministically
172    /// sorted list for easy assertion.
173    fn collect_diff_pairs<B>(batch: &B) -> Vec<(String, u64, Option<String>, Option<String>)>
174    where
175        B: BatchReader<Diff = Diff>,
176        B::Time: Copy + Into<u64>,
177        B::ValOwn: 'static + Into<String>,
178        <B::KeyContainer as BatchContainer>::Owned: Into<String> + Clone,
179    {
180        let mut out = vec![];
181        for_each_diff_pair(batch, |k, t, dp| {
182            out.push((
183                k.clone().into(),
184                t.into(),
185                dp.before.map(Into::into),
186                dp.after.map(Into::into),
187            ));
188        });
189        out.sort();
190        out
191    }
192
193    #[mz_ore::test]
194    fn single_insertion() {
195        let batch = batch_from_tuples(vec![(("k1".into(), "v1".into()), 5, Diff::ONE)], 6);
196        let pairs = collect_diff_pairs(&batch);
197        assert_eq!(pairs, vec![("k1".into(), 5, None, Some("v1".into()))]);
198    }
199
200    #[mz_ore::test]
201    fn single_retraction() {
202        let batch = batch_from_tuples(vec![(("k1".into(), "v1".into()), 5, -Diff::ONE)], 6);
203        let pairs = collect_diff_pairs(&batch);
204        assert_eq!(pairs, vec![("k1".into(), 5, Some("v1".into()), None)]);
205    }
206
207    #[mz_ore::test]
208    fn update_at_same_timestamp() {
209        // Retract v1 and insert v2 at the same timestamp → paired into a single
210        // DiffPair with both before and after populated.
211        let batch = batch_from_tuples(
212            vec![
213                (("k1".into(), "v1".into()), 5, -Diff::ONE),
214                (("k1".into(), "v2".into()), 5, Diff::ONE),
215            ],
216            6,
217        );
218        let pairs = collect_diff_pairs(&batch);
219        assert_eq!(
220            pairs,
221            vec![("k1".into(), 5, Some("v1".into()), Some("v2".into()))]
222        );
223    }
224
225    #[mz_ore::test]
226    fn update_across_timestamps() {
227        // Insert v1 at t=5, then replace with v2 at t=10.
228        let batch = batch_from_tuples(
229            vec![
230                (("k1".into(), "v1".into()), 5, Diff::ONE),
231                (("k1".into(), "v1".into()), 10, -Diff::ONE),
232                (("k1".into(), "v2".into()), 10, Diff::ONE),
233            ],
234            11,
235        );
236        let pairs = collect_diff_pairs(&batch);
237        assert_eq!(
238            pairs,
239            vec![
240                ("k1".into(), 5, None, Some("v1".into())),
241                ("k1".into(), 10, Some("v1".into()), Some("v2".into())),
242            ]
243        );
244    }
245
246    #[mz_ore::test]
247    fn diff_greater_than_one_fans_out() {
248        // Diff=3 becomes three independent `DiffPair`s at the same timestamp.
249        let batch = batch_from_tuples(vec![(("k1".into(), "v1".into()), 5, Diff::from(3))], 6);
250        let pairs = collect_diff_pairs(&batch);
251        assert_eq!(
252            pairs,
253            vec![
254                ("k1".into(), 5, None, Some("v1".into())),
255                ("k1".into(), 5, None, Some("v1".into())),
256                ("k1".into(), 5, None, Some("v1".into())),
257            ]
258        );
259    }
260
261    #[mz_ore::test]
262    fn multiple_keys_are_independent() {
263        let batch = batch_from_tuples(
264            vec![
265                (("k1".into(), "v1".into()), 5, Diff::ONE),
266                (("k2".into(), "v2".into()), 5, Diff::ONE),
267            ],
268            6,
269        );
270        let pairs = collect_diff_pairs(&batch);
271        assert_eq!(
272            pairs,
273            vec![
274                ("k1".into(), 5, None, Some("v1".into())),
275                ("k2".into(), 5, None, Some("v2".into())),
276            ]
277        );
278    }
279
280    #[mz_ore::test]
281    fn unpaired_before_and_after_at_different_timestamps() {
282        // Retraction at t=5, insertion at t=10 — they do NOT pair because they
283        // live at different timestamps.
284        let batch = batch_from_tuples(
285            vec![
286                (("k1".into(), "v1".into()), 5, -Diff::ONE),
287                (("k1".into(), "v2".into()), 10, Diff::ONE),
288            ],
289            11,
290        );
291        let pairs = collect_diff_pairs(&batch);
292        assert_eq!(
293            pairs,
294            vec![
295                ("k1".into(), 5, Some("v1".into()), None),
296                ("k1".into(), 10, None, Some("v2".into())),
297            ]
298        );
299    }
300}