mz_interchange/
envelopes.rs1use std::collections::BTreeMap;
11use std::iter;
12use std::sync::LazyLock;
13
14use differential_dataflow::lattice::Lattice;
15use differential_dataflow::operators::arrange::Arranged;
16use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
17use differential_dataflow::{AsCollection, Collection};
18use itertools::{EitherOrBoth, Itertools};
19use maplit::btreemap;
20use mz_ore::cast::CastFrom;
21use mz_repr::{CatalogItemId, ColumnName, ColumnType, Datum, Diff, Row, RowPacker, ScalarType};
22use timely::dataflow::Scope;
23use timely::dataflow::channels::pact::Pipeline;
24use timely::dataflow::operators::Operator;
25
26use crate::avro::DiffPair;
27
28pub fn combine_at_timestamp<G: Scope, Tr>(
35 arranged: Arranged<G, Tr>,
36) -> Collection<G, (Tr::KeyOwn, Vec<DiffPair<Tr::ValOwn>>), Diff>
37where
38 G::Timestamp: Lattice + Copy,
39 Tr: Clone
40 + TraceReader<Diff = Diff, Time = G::Timestamp, KeyOwn: Clone + 'static, ValOwn: 'static>,
41{
42 arranged
43 .stream
44 .unary(Pipeline, "combine_at_timestamp", move |_, _| {
45 move |input, output| {
46 while let Some((cap, batches)) = input.next() {
47 let mut session = output.session(&cap);
48 for batch in batches.drain(..) {
49 let mut befores = vec![];
50 let mut afters = vec![];
51
52 let mut cursor = batch.cursor();
53 while cursor.key_valid(&batch) {
54 let k = cursor.key(&batch);
55
56 while cursor.val_valid(&batch) {
59 let v = cursor.val(&batch);
60 cursor.map_times(&batch, |t, diff| {
61 let diff = Tr::owned_diff(diff);
62 let update = (
63 Tr::owned_time(t),
64 Tr::owned_val(v),
65 usize::cast_from(diff.unsigned_abs()),
66 );
67 if diff < Diff::ZERO {
68 befores.push(update);
69 } else {
70 afters.push(update);
71 }
72 });
73 cursor.step_val(&batch);
74 }
75
76 befores.sort_by_key(|(t, _v, _diff)| *t);
78 afters.sort_by_key(|(t, _v, _diff)| *t);
79
80 let befores = befores
82 .drain(..)
83 .flat_map(|(t, v, cnt)| iter::repeat((t, v)).take(cnt));
84 let afters = afters
85 .drain(..)
86 .flat_map(|(t, v, cnt)| iter::repeat((t, v)).take(cnt));
87
88 let groups = itertools::merge_join_by(
91 befores,
92 afters,
93 |(t1, _v1), (t2, _v2)| t1.cmp(t2),
94 )
95 .map(|pair| match pair {
96 EitherOrBoth::Both((t, before), (_t, after)) => {
97 (t, Some(before.clone()), Some(after.clone()))
98 }
99 EitherOrBoth::Left((t, before)) => (t, Some(before.clone()), None),
100 EitherOrBoth::Right((t, after)) => (t, None, Some(after.clone())),
101 })
102 .chunk_by(|(t, _before, _after)| *t);
103
104 for (t, group) in &groups {
107 let group = group
108 .map(|(_t, before, after)| DiffPair { before, after })
109 .collect();
110 session.give(((Tr::owned_key(k), group), t, Diff::ONE));
111 }
112
113 cursor.step_key(&batch);
114 }
115 }
116 }
117 }
118 })
119 .as_collection()
120}
121
122pub(crate) const TRANSACTION_TYPE_ID: CatalogItemId = CatalogItemId::Transient(1);
130pub(crate) const DBZ_ROW_TYPE_ID: CatalogItemId = CatalogItemId::Transient(2);
131
132pub static ENVELOPE_CUSTOM_NAMES: LazyLock<BTreeMap<CatalogItemId, String>> = LazyLock::new(|| {
133 btreemap! {
134 TRANSACTION_TYPE_ID => "transaction".into(),
135 DBZ_ROW_TYPE_ID => "row".into(),
136 }
137});
138
139pub(crate) fn dbz_envelope(
140 names_and_types: Vec<(ColumnName, ColumnType)>,
141) -> Vec<(ColumnName, ColumnType)> {
142 let row = ColumnType {
143 nullable: true,
144 scalar_type: ScalarType::Record {
145 fields: names_and_types.into(),
146 custom_id: Some(DBZ_ROW_TYPE_ID),
147 },
148 };
149 vec![("before".into(), row.clone()), ("after".into(), row)]
150}
151
152pub fn dbz_format(rp: &mut RowPacker, dp: DiffPair<Row>) {
153 if let Some(before) = dp.before {
154 rp.push_list_with(|rp| rp.extend_by_row(&before));
155 } else {
156 rp.push(Datum::Null);
157 }
158 if let Some(after) = dp.after {
159 rp.push_list_with(|rp| rp.extend_by_row(&after));
160 } else {
161 rp.push(Datum::Null);
162 }
163}