mz_interchange/
envelopes.rs1use std::collections::BTreeMap;
11use std::iter;
12use std::sync::LazyLock;
13
14use differential_dataflow::lattice::Lattice;
15use differential_dataflow::operators::arrange::Arranged;
16use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
17use differential_dataflow::{AsCollection, Collection};
18use itertools::{EitherOrBoth, Itertools};
19use maplit::btreemap;
20use mz_ore::cast::CastFrom;
21use mz_repr::{
22 CatalogItemId, ColumnName, Datum, Diff, Row, RowPacker, SqlColumnType, SqlScalarType,
23};
24use timely::dataflow::Scope;
25use timely::dataflow::channels::pact::Pipeline;
26use timely::dataflow::operators::Operator;
27
28use crate::avro::DiffPair;
29
30pub fn combine_at_timestamp<G: Scope, Tr>(
37 arranged: Arranged<G, Tr>,
38) -> Collection<G, (Tr::KeyOwn, Vec<DiffPair<Tr::ValOwn>>), Diff>
39where
40 G::Timestamp: Lattice + Copy,
41 Tr: Clone
42 + TraceReader<Diff = Diff, Time = G::Timestamp, KeyOwn: Clone + 'static, ValOwn: 'static>,
43{
44 arranged
45 .stream
46 .unary(Pipeline, "combine_at_timestamp", move |_, _| {
47 move |input, output| {
48 while let Some((cap, batches)) = input.next() {
49 let mut session = output.session(&cap);
50 for batch in batches.drain(..) {
51 let mut befores = vec![];
52 let mut afters = vec![];
53
54 let mut cursor = batch.cursor();
55 while cursor.key_valid(&batch) {
56 let k = cursor.key(&batch);
57
58 while cursor.val_valid(&batch) {
61 let v = cursor.val(&batch);
62 cursor.map_times(&batch, |t, diff| {
63 let diff = Tr::owned_diff(diff);
64 let update = (
65 Tr::owned_time(t),
66 Tr::owned_val(v),
67 usize::cast_from(diff.unsigned_abs()),
68 );
69 if diff < Diff::ZERO {
70 befores.push(update);
71 } else {
72 afters.push(update);
73 }
74 });
75 cursor.step_val(&batch);
76 }
77
78 befores.sort_by_key(|(t, _v, _diff)| *t);
80 afters.sort_by_key(|(t, _v, _diff)| *t);
81
82 let befores = befores
84 .drain(..)
85 .flat_map(|(t, v, cnt)| iter::repeat((t, v)).take(cnt));
86 let afters = afters
87 .drain(..)
88 .flat_map(|(t, v, cnt)| iter::repeat((t, v)).take(cnt));
89
90 let groups = itertools::merge_join_by(
93 befores,
94 afters,
95 |(t1, _v1), (t2, _v2)| t1.cmp(t2),
96 )
97 .map(|pair| match pair {
98 EitherOrBoth::Both((t, before), (_t, after)) => {
99 (t, Some(before.clone()), Some(after.clone()))
100 }
101 EitherOrBoth::Left((t, before)) => (t, Some(before.clone()), None),
102 EitherOrBoth::Right((t, after)) => (t, None, Some(after.clone())),
103 })
104 .chunk_by(|(t, _before, _after)| *t);
105
106 for (t, group) in &groups {
109 let group = group
110 .map(|(_t, before, after)| DiffPair { before, after })
111 .collect();
112 session.give(((Tr::owned_key(k), group), t, Diff::ONE));
113 }
114
115 cursor.step_key(&batch);
116 }
117 }
118 }
119 }
120 })
121 .as_collection()
122}
123
124pub(crate) const TRANSACTION_TYPE_ID: CatalogItemId = CatalogItemId::Transient(1);
132pub(crate) const DBZ_ROW_TYPE_ID: CatalogItemId = CatalogItemId::Transient(2);
133
134pub static ENVELOPE_CUSTOM_NAMES: LazyLock<BTreeMap<CatalogItemId, String>> = LazyLock::new(|| {
135 btreemap! {
136 TRANSACTION_TYPE_ID => "transaction".into(),
137 DBZ_ROW_TYPE_ID => "row".into(),
138 }
139});
140
141pub(crate) fn dbz_envelope(
142 names_and_types: Vec<(ColumnName, SqlColumnType)>,
143) -> Vec<(ColumnName, SqlColumnType)> {
144 let row = SqlColumnType {
145 nullable: true,
146 scalar_type: SqlScalarType::Record {
147 fields: names_and_types.into(),
148 custom_id: Some(DBZ_ROW_TYPE_ID),
149 },
150 };
151 vec![("before".into(), row.clone()), ("after".into(), row)]
152}
153
154pub fn dbz_format(rp: &mut RowPacker, dp: DiffPair<Row>) {
155 if let Some(before) = dp.before {
156 rp.push_list_with(|rp| rp.extend_by_row(&before));
157 } else {
158 rp.push(Datum::Null);
159 }
160 if let Some(after) = dp.after {
161 rp.push_list_with(|rp| rp.extend_by_row(&after));
162 } else {
163 rp.push(Datum::Null);
164 }
165}