mz_interchange/
envelopes.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::iter;
12use std::sync::LazyLock;
13
14use differential_dataflow::IntoOwned;
15use differential_dataflow::lattice::Lattice;
16use differential_dataflow::operators::arrange::Arranged;
17use differential_dataflow::trace::{Batch, BatchReader, Cursor, TraceReader};
18use differential_dataflow::{AsCollection, Collection};
19use itertools::{EitherOrBoth, Itertools};
20use maplit::btreemap;
21use mz_ore::cast::CastFrom;
22use mz_repr::{CatalogItemId, ColumnName, ColumnType, Datum, Diff, Row, RowPacker, ScalarType};
23use timely::dataflow::channels::pact::Pipeline;
24use timely::dataflow::operators::Operator;
25use timely::dataflow::{Scope, Stream};
26
27use crate::avro::DiffPair;
28
29/// Given a stream of batches, produce a stream of groups of DiffPairs, grouped
30/// by key, at each timestamp.
31///
32// This is useful for some sink envelopes (e.g., Debezium and Upsert), which
33// need to do specific logic based on the _entire_ set of before/after diffs for
34// a given key at each timestamp.
35pub fn combine_at_timestamp<G: Scope, Tr>(
36    arranged: Arranged<G, Tr>,
37) -> Collection<G, (Option<Row>, Vec<DiffPair<Row>>), Diff>
38where
39    G::Timestamp: Lattice + Copy,
40    Tr: Clone
41        + for<'a> TraceReader<
42            Key<'a> = &'a Option<Row>,
43            Val<'a> = &'a Row,
44            Time = G::Timestamp,
45            Diff = Diff,
46        >,
47    Tr::Batch: Batch,
48    for<'a> Tr::TimeGat<'a>: Ord,
49{
50    let x: Stream<G, ((Option<Row>, Vec<DiffPair<Row>>), G::Timestamp, Diff)> = arranged
51        .stream
52        .unary(Pipeline, "combine_at_timestamp", move |_, _| {
53            move |input, output| {
54                while let Some((cap, batches)) = input.next() {
55                    let mut session = output.session(&cap);
56                    for batch in batches.drain(..) {
57                        let mut befores = vec![];
58                        let mut afters = vec![];
59
60                        let mut cursor = batch.cursor();
61                        while cursor.key_valid(&batch) {
62                            let k = cursor.key(&batch);
63
64                            // Partition updates into retractions (befores)
65                            // and insertions (afters).
66                            while cursor.val_valid(&batch) {
67                                let v = cursor.val(&batch);
68                                cursor.map_times(&batch, |t, diff| {
69                                    let diff = diff.into_owned();
70                                    let update = (
71                                        t.into_owned(),
72                                        v.clone(),
73                                        usize::cast_from(diff.unsigned_abs()),
74                                    );
75                                    if diff < Diff::ZERO {
76                                        befores.push(update);
77                                    } else {
78                                        afters.push(update);
79                                    }
80                                });
81                                cursor.step_val(&batch);
82                            }
83
84                            // Sort by timestamp.
85                            befores.sort_by_key(|(t, _v, _diff)| *t);
86                            afters.sort_by_key(|(t, _v, _diff)| *t);
87
88                            // Convert diff into unary representation.
89                            let befores = befores
90                                .drain(..)
91                                .flat_map(|(t, v, cnt)| iter::repeat((t, v)).take(cnt));
92                            let afters = afters
93                                .drain(..)
94                                .flat_map(|(t, v, cnt)| iter::repeat((t, v)).take(cnt));
95
96                            // At each timestamp, zip together the insertions
97                            // and retractions into diff pairs.
98                            let groups = itertools::merge_join_by(
99                                befores,
100                                afters,
101                                |(t1, _v1), (t2, _v2)| t1.cmp(t2),
102                            )
103                            .map(|pair| match pair {
104                                EitherOrBoth::Both((t, before), (_t, after)) => {
105                                    (t, Some(before.clone()), Some(after.clone()))
106                                }
107                                EitherOrBoth::Left((t, before)) => (t, Some(before.clone()), None),
108                                EitherOrBoth::Right((t, after)) => (t, None, Some(after.clone())),
109                            })
110                            .group_by(|(t, _before, _after)| *t);
111
112                            // For each timestamp, emit the group of
113                            // `DiffPair`s.
114                            for (t, group) in &groups {
115                                let group = group
116                                    .map(|(_t, before, after)| DiffPair { before, after })
117                                    .collect();
118                                session.give(((k.clone(), group), t, Diff::ONE));
119                            }
120
121                            cursor.step_key(&batch);
122                        }
123                    }
124                }
125            }
126        });
127    x.as_collection()
128}
129
130// NOTE(benesch): statically allocating transient IDs for the
131// transaction and row types is a bit of a hack to allow us to attach
132// custom names to these types in the generated Avro schema. In the
133// future, these types should be real types that get created in the
134// catalog with userspace IDs when the user creates the sink, and their
135// names and IDs should be plumbed in from the catalog at the moment
136// the sink is created.
137pub(crate) const TRANSACTION_TYPE_ID: CatalogItemId = CatalogItemId::Transient(1);
138pub(crate) const DBZ_ROW_TYPE_ID: CatalogItemId = CatalogItemId::Transient(2);
139
140pub static ENVELOPE_CUSTOM_NAMES: LazyLock<BTreeMap<CatalogItemId, String>> = LazyLock::new(|| {
141    btreemap! {
142        TRANSACTION_TYPE_ID => "transaction".into(),
143        DBZ_ROW_TYPE_ID => "row".into(),
144    }
145});
146
147pub(crate) fn dbz_envelope(
148    names_and_types: Vec<(ColumnName, ColumnType)>,
149) -> Vec<(ColumnName, ColumnType)> {
150    let row = ColumnType {
151        nullable: true,
152        scalar_type: ScalarType::Record {
153            fields: names_and_types.into(),
154            custom_id: Some(DBZ_ROW_TYPE_ID),
155        },
156    };
157    vec![("before".into(), row.clone()), ("after".into(), row)]
158}
159
160pub fn dbz_format(rp: &mut RowPacker, dp: DiffPair<Row>) {
161    if let Some(before) = dp.before {
162        rp.push_list_with(|rp| rp.extend_by_row(&before));
163    } else {
164        rp.push(Datum::Null);
165    }
166    if let Some(after) = dp.after {
167        rp.push_list_with(|rp| rp.extend_by_row(&after));
168    } else {
169        rp.push(Datum::Null);
170    }
171}