mz_interchange/
envelopes.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::iter;
12use std::sync::LazyLock;
13
14use differential_dataflow::lattice::Lattice;
15use differential_dataflow::operators::arrange::Arranged;
16use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
17use differential_dataflow::{AsCollection, Collection};
18use itertools::{EitherOrBoth, Itertools};
19use maplit::btreemap;
20use mz_ore::cast::CastFrom;
21use mz_repr::{
22    CatalogItemId, ColumnName, Datum, Diff, Row, RowPacker, SqlColumnType, SqlScalarType,
23};
24use timely::dataflow::Scope;
25use timely::dataflow::channels::pact::Pipeline;
26use timely::dataflow::operators::Operator;
27
28use crate::avro::DiffPair;
29
30/// Given a stream of batches, produce a stream of groups of DiffPairs, grouped
31/// by key, at each timestamp.
32///
33// This is useful for some sink envelopes (e.g., Debezium and Upsert), which
34// need to do specific logic based on the _entire_ set of before/after diffs for
35// a given key at each timestamp.
36pub fn combine_at_timestamp<G: Scope, Tr>(
37    arranged: Arranged<G, Tr>,
38) -> Collection<G, (Tr::KeyOwn, Vec<DiffPair<Tr::ValOwn>>), Diff>
39where
40    G::Timestamp: Lattice + Copy,
41    Tr: Clone
42        + TraceReader<Diff = Diff, Time = G::Timestamp, KeyOwn: Clone + 'static, ValOwn: 'static>,
43{
44    arranged
45        .stream
46        .unary(Pipeline, "combine_at_timestamp", move |_, _| {
47            move |input, output| {
48                while let Some((cap, batches)) = input.next() {
49                    let mut session = output.session(&cap);
50                    for batch in batches.drain(..) {
51                        let mut befores = vec![];
52                        let mut afters = vec![];
53
54                        let mut cursor = batch.cursor();
55                        while cursor.key_valid(&batch) {
56                            let k = cursor.key(&batch);
57
58                            // Partition updates into retractions (befores)
59                            // and insertions (afters).
60                            while cursor.val_valid(&batch) {
61                                let v = cursor.val(&batch);
62                                cursor.map_times(&batch, |t, diff| {
63                                    let diff = Tr::owned_diff(diff);
64                                    let update = (
65                                        Tr::owned_time(t),
66                                        Tr::owned_val(v),
67                                        usize::cast_from(diff.unsigned_abs()),
68                                    );
69                                    if diff < Diff::ZERO {
70                                        befores.push(update);
71                                    } else {
72                                        afters.push(update);
73                                    }
74                                });
75                                cursor.step_val(&batch);
76                            }
77
78                            // Sort by timestamp.
79                            befores.sort_by_key(|(t, _v, _diff)| *t);
80                            afters.sort_by_key(|(t, _v, _diff)| *t);
81
82                            // Convert diff into unary representation.
83                            let befores = befores
84                                .drain(..)
85                                .flat_map(|(t, v, cnt)| iter::repeat((t, v)).take(cnt));
86                            let afters = afters
87                                .drain(..)
88                                .flat_map(|(t, v, cnt)| iter::repeat((t, v)).take(cnt));
89
90                            // At each timestamp, zip together the insertions
91                            // and retractions into diff pairs.
92                            let groups = itertools::merge_join_by(
93                                befores,
94                                afters,
95                                |(t1, _v1), (t2, _v2)| t1.cmp(t2),
96                            )
97                            .map(|pair| match pair {
98                                EitherOrBoth::Both((t, before), (_t, after)) => {
99                                    (t, Some(before.clone()), Some(after.clone()))
100                                }
101                                EitherOrBoth::Left((t, before)) => (t, Some(before.clone()), None),
102                                EitherOrBoth::Right((t, after)) => (t, None, Some(after.clone())),
103                            })
104                            .chunk_by(|(t, _before, _after)| *t);
105
106                            // For each timestamp, emit the group of
107                            // `DiffPair`s.
108                            for (t, group) in &groups {
109                                let group = group
110                                    .map(|(_t, before, after)| DiffPair { before, after })
111                                    .collect();
112                                session.give(((Tr::owned_key(k), group), t, Diff::ONE));
113                            }
114
115                            cursor.step_key(&batch);
116                        }
117                    }
118                }
119            }
120        })
121        .as_collection()
122}
123
124// NOTE(benesch): statically allocating transient IDs for the
125// transaction and row types is a bit of a hack to allow us to attach
126// custom names to these types in the generated Avro schema. In the
127// future, these types should be real types that get created in the
128// catalog with userspace IDs when the user creates the sink, and their
129// names and IDs should be plumbed in from the catalog at the moment
130// the sink is created.
131pub(crate) const TRANSACTION_TYPE_ID: CatalogItemId = CatalogItemId::Transient(1);
132pub(crate) const DBZ_ROW_TYPE_ID: CatalogItemId = CatalogItemId::Transient(2);
133
134pub static ENVELOPE_CUSTOM_NAMES: LazyLock<BTreeMap<CatalogItemId, String>> = LazyLock::new(|| {
135    btreemap! {
136        TRANSACTION_TYPE_ID => "transaction".into(),
137        DBZ_ROW_TYPE_ID => "row".into(),
138    }
139});
140
141pub(crate) fn dbz_envelope(
142    names_and_types: Vec<(ColumnName, SqlColumnType)>,
143) -> Vec<(ColumnName, SqlColumnType)> {
144    let row = SqlColumnType {
145        nullable: true,
146        scalar_type: SqlScalarType::Record {
147            fields: names_and_types.into(),
148            custom_id: Some(DBZ_ROW_TYPE_ID),
149        },
150    };
151    vec![("before".into(), row.clone()), ("after".into(), row)]
152}
153
154pub fn dbz_format(rp: &mut RowPacker, dp: DiffPair<Row>) {
155    if let Some(before) = dp.before {
156        rp.push_list_with(|rp| rp.extend_by_row(&before));
157    } else {
158        rp.push(Datum::Null);
159    }
160    if let Some(after) = dp.after {
161        rp.push_list_with(|rp| rp.extend_by_row(&after));
162    } else {
163        rp.push(Datum::Null);
164    }
165}