mz_interchange/
envelopes.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::iter;
12use std::sync::LazyLock;
13
14use differential_dataflow::lattice::Lattice;
15use differential_dataflow::operators::arrange::Arranged;
16use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
17use differential_dataflow::{AsCollection, Collection};
18use itertools::{EitherOrBoth, Itertools};
19use maplit::btreemap;
20use mz_ore::cast::CastFrom;
21use mz_repr::{CatalogItemId, ColumnName, ColumnType, Datum, Diff, Row, RowPacker, ScalarType};
22use timely::dataflow::Scope;
23use timely::dataflow::channels::pact::Pipeline;
24use timely::dataflow::operators::Operator;
25
26use crate::avro::DiffPair;
27
28/// Given a stream of batches, produce a stream of groups of DiffPairs, grouped
29/// by key, at each timestamp.
30///
31// This is useful for some sink envelopes (e.g., Debezium and Upsert), which
32// need to do specific logic based on the _entire_ set of before/after diffs for
33// a given key at each timestamp.
34pub fn combine_at_timestamp<G: Scope, Tr>(
35    arranged: Arranged<G, Tr>,
36) -> Collection<G, (Tr::KeyOwn, Vec<DiffPair<Tr::ValOwn>>), Diff>
37where
38    G::Timestamp: Lattice + Copy,
39    Tr: Clone
40        + TraceReader<Diff = Diff, Time = G::Timestamp, KeyOwn: Clone + 'static, ValOwn: 'static>,
41{
42    arranged
43        .stream
44        .unary(Pipeline, "combine_at_timestamp", move |_, _| {
45            move |input, output| {
46                while let Some((cap, batches)) = input.next() {
47                    let mut session = output.session(&cap);
48                    for batch in batches.drain(..) {
49                        let mut befores = vec![];
50                        let mut afters = vec![];
51
52                        let mut cursor = batch.cursor();
53                        while cursor.key_valid(&batch) {
54                            let k = cursor.key(&batch);
55
56                            // Partition updates into retractions (befores)
57                            // and insertions (afters).
58                            while cursor.val_valid(&batch) {
59                                let v = cursor.val(&batch);
60                                cursor.map_times(&batch, |t, diff| {
61                                    let diff = Tr::owned_diff(diff);
62                                    let update = (
63                                        Tr::owned_time(t),
64                                        Tr::owned_val(v),
65                                        usize::cast_from(diff.unsigned_abs()),
66                                    );
67                                    if diff < Diff::ZERO {
68                                        befores.push(update);
69                                    } else {
70                                        afters.push(update);
71                                    }
72                                });
73                                cursor.step_val(&batch);
74                            }
75
76                            // Sort by timestamp.
77                            befores.sort_by_key(|(t, _v, _diff)| *t);
78                            afters.sort_by_key(|(t, _v, _diff)| *t);
79
80                            // Convert diff into unary representation.
81                            let befores = befores
82                                .drain(..)
83                                .flat_map(|(t, v, cnt)| iter::repeat((t, v)).take(cnt));
84                            let afters = afters
85                                .drain(..)
86                                .flat_map(|(t, v, cnt)| iter::repeat((t, v)).take(cnt));
87
88                            // At each timestamp, zip together the insertions
89                            // and retractions into diff pairs.
90                            let groups = itertools::merge_join_by(
91                                befores,
92                                afters,
93                                |(t1, _v1), (t2, _v2)| t1.cmp(t2),
94                            )
95                            .map(|pair| match pair {
96                                EitherOrBoth::Both((t, before), (_t, after)) => {
97                                    (t, Some(before.clone()), Some(after.clone()))
98                                }
99                                EitherOrBoth::Left((t, before)) => (t, Some(before.clone()), None),
100                                EitherOrBoth::Right((t, after)) => (t, None, Some(after.clone())),
101                            })
102                            .chunk_by(|(t, _before, _after)| *t);
103
104                            // For each timestamp, emit the group of
105                            // `DiffPair`s.
106                            for (t, group) in &groups {
107                                let group = group
108                                    .map(|(_t, before, after)| DiffPair { before, after })
109                                    .collect();
110                                session.give(((Tr::owned_key(k), group), t, Diff::ONE));
111                            }
112
113                            cursor.step_key(&batch);
114                        }
115                    }
116                }
117            }
118        })
119        .as_collection()
120}
121
122// NOTE(benesch): statically allocating transient IDs for the
123// transaction and row types is a bit of a hack to allow us to attach
124// custom names to these types in the generated Avro schema. In the
125// future, these types should be real types that get created in the
126// catalog with userspace IDs when the user creates the sink, and their
127// names and IDs should be plumbed in from the catalog at the moment
128// the sink is created.
129pub(crate) const TRANSACTION_TYPE_ID: CatalogItemId = CatalogItemId::Transient(1);
130pub(crate) const DBZ_ROW_TYPE_ID: CatalogItemId = CatalogItemId::Transient(2);
131
132pub static ENVELOPE_CUSTOM_NAMES: LazyLock<BTreeMap<CatalogItemId, String>> = LazyLock::new(|| {
133    btreemap! {
134        TRANSACTION_TYPE_ID => "transaction".into(),
135        DBZ_ROW_TYPE_ID => "row".into(),
136    }
137});
138
139pub(crate) fn dbz_envelope(
140    names_and_types: Vec<(ColumnName, ColumnType)>,
141) -> Vec<(ColumnName, ColumnType)> {
142    let row = ColumnType {
143        nullable: true,
144        scalar_type: ScalarType::Record {
145            fields: names_and_types.into(),
146            custom_id: Some(DBZ_ROW_TYPE_ID),
147        },
148    };
149    vec![("before".into(), row.clone()), ("after".into(), row)]
150}
151
152pub fn dbz_format(rp: &mut RowPacker, dp: DiffPair<Row>) {
153    if let Some(before) = dp.before {
154        rp.push_list_with(|rp| rp.extend_by_row(&before));
155    } else {
156        rp.push(Datum::Null);
157    }
158    if let Some(after) = dp.after {
159        rp.push_list_with(|rp| rp.extend_by_row(&after));
160    } else {
161        rp.push(Datum::Null);
162    }
163}