1use std::collections::BTreeMap;
11use std::iter;
12use std::sync::LazyLock;
13
14use differential_dataflow::trace::implementations::BatchContainer;
15use differential_dataflow::trace::{BatchReader, Cursor};
16use itertools::EitherOrBoth;
17use maplit::btreemap;
18use mz_ore::cast::CastFrom;
19use mz_repr::{
20 CatalogItemId, ColumnName, Datum, Diff, Row, RowPacker, SqlColumnType, SqlScalarType,
21};
22
23use crate::avro::DiffPair;
24
25pub fn for_each_diff_pair<B, F>(batch: &B, mut on_diff_pair: F)
35where
36 B: BatchReader<Diff = Diff>,
37 B::Time: Copy,
38 B::ValOwn: 'static,
39 F: FnMut(&<B::KeyContainer as BatchContainer>::Owned, B::Time, DiffPair<B::ValOwn>),
40{
41 let mut befores: Vec<(B::Time, B::ValOwn, usize)> = vec![];
42 let mut afters: Vec<(B::Time, B::ValOwn, usize)> = vec![];
43
44 let mut cursor = batch.cursor();
45 while cursor.key_valid(batch) {
46 let k = cursor.key(batch);
47
48 while cursor.val_valid(batch) {
51 let v = cursor.val(batch);
52 cursor.map_times(batch, |t, diff| {
53 let diff = B::owned_diff(diff);
54 let update = (
55 B::owned_time(t),
56 B::owned_val(v),
57 usize::cast_from(diff.unsigned_abs()),
58 );
59 if diff < Diff::ZERO {
60 befores.push(update);
61 } else {
62 afters.push(update);
63 }
64 });
65 cursor.step_val(batch);
66 }
67
68 befores.sort_by_key(|(t, _v, _diff)| *t);
69 afters.sort_by_key(|(t, _v, _diff)| *t);
70
71 let fan_out = |(t, v, cnt): (B::Time, B::ValOwn, usize)| iter::repeat_n((t, v), cnt);
77 let befores_iter = befores.drain(..).flat_map(fan_out);
78 let afters_iter = afters.drain(..).flat_map(fan_out);
79
80 let key_owned = <B::KeyContainer as BatchContainer>::into_owned(k);
81
82 for pair in
83 itertools::merge_join_by(befores_iter, afters_iter, |(t1, _v1), (t2, _v2)| t1.cmp(t2))
84 {
85 let (t, before, after) = match pair {
86 EitherOrBoth::Both((t, before), (_t, after)) => (t, Some(before), Some(after)),
87 EitherOrBoth::Left((t, before)) => (t, Some(before), None),
88 EitherOrBoth::Right((t, after)) => (t, None, Some(after)),
89 };
90 on_diff_pair(&key_owned, t, DiffPair { before, after });
91 }
92
93 cursor.step_key(batch);
94 }
95}
96
97pub(crate) const TRANSACTION_TYPE_ID: CatalogItemId = CatalogItemId::Transient(1);
105pub(crate) const DBZ_ROW_TYPE_ID: CatalogItemId = CatalogItemId::Transient(2);
106
107pub static ENVELOPE_CUSTOM_NAMES: LazyLock<BTreeMap<CatalogItemId, String>> = LazyLock::new(|| {
108 btreemap! {
109 TRANSACTION_TYPE_ID => "transaction".into(),
110 DBZ_ROW_TYPE_ID => "row".into(),
111 }
112});
113
114pub(crate) fn dbz_envelope(
115 names_and_types: Vec<(ColumnName, SqlColumnType)>,
116) -> Vec<(ColumnName, SqlColumnType)> {
117 let row = SqlColumnType {
118 nullable: true,
119 scalar_type: SqlScalarType::Record {
120 fields: names_and_types.into(),
121 custom_id: Some(DBZ_ROW_TYPE_ID),
122 },
123 };
124 vec![("before".into(), row.clone()), ("after".into(), row)]
125}
126
127pub fn dbz_format(rp: &mut RowPacker, dp: DiffPair<Row>) {
128 if let Some(before) = dp.before {
129 rp.push_list_with(|rp| rp.extend_by_row(&before));
130 } else {
131 rp.push(Datum::Null);
132 }
133 if let Some(after) = dp.after {
134 rp.push_list_with(|rp| rp.extend_by_row(&after));
135 } else {
136 rp.push(Datum::Null);
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use differential_dataflow::trace::Batcher;
143 use differential_dataflow::trace::implementations::{ValBatcher, ValBuilder};
144 use timely::progress::Antichain;
145
146 use super::*;
147
148 fn batch_from_tuples(
151 mut tuples: Vec<((String, String), u64, Diff)>,
152 upper: u64,
153 ) -> <ValBuilder<String, String, u64, Diff> as differential_dataflow::trace::Builder>::Output
154 {
155 let mut batcher = ValBatcher::<String, String, u64, Diff>::new(None, 0);
156 batcher.push_container(&mut tuples);
157 batcher.seal::<ValBuilder<String, String, u64, Diff>>(Antichain::from_elem(upper))
158 }
159
160 fn collect_diff_pairs<B>(batch: &B) -> Vec<(String, u64, Option<String>, Option<String>)>
163 where
164 B: BatchReader<Diff = Diff>,
165 B::Time: Copy + Into<u64>,
166 B::ValOwn: 'static + Into<String>,
167 <B::KeyContainer as BatchContainer>::Owned: Into<String> + Clone,
168 {
169 let mut out = vec![];
170 for_each_diff_pair(batch, |k, t, dp| {
171 out.push((
172 k.clone().into(),
173 t.into(),
174 dp.before.map(Into::into),
175 dp.after.map(Into::into),
176 ));
177 });
178 out.sort();
179 out
180 }
181
182 #[mz_ore::test]
183 fn single_insertion() {
184 let batch = batch_from_tuples(vec![(("k1".into(), "v1".into()), 5, Diff::ONE)], 6);
185 let pairs = collect_diff_pairs(&batch);
186 assert_eq!(pairs, vec![("k1".into(), 5, None, Some("v1".into()))]);
187 }
188
189 #[mz_ore::test]
190 fn single_retraction() {
191 let batch = batch_from_tuples(vec![(("k1".into(), "v1".into()), 5, -Diff::ONE)], 6);
192 let pairs = collect_diff_pairs(&batch);
193 assert_eq!(pairs, vec![("k1".into(), 5, Some("v1".into()), None)]);
194 }
195
196 #[mz_ore::test]
197 fn update_at_same_timestamp() {
198 let batch = batch_from_tuples(
201 vec![
202 (("k1".into(), "v1".into()), 5, -Diff::ONE),
203 (("k1".into(), "v2".into()), 5, Diff::ONE),
204 ],
205 6,
206 );
207 let pairs = collect_diff_pairs(&batch);
208 assert_eq!(
209 pairs,
210 vec![("k1".into(), 5, Some("v1".into()), Some("v2".into()))]
211 );
212 }
213
214 #[mz_ore::test]
215 fn update_across_timestamps() {
216 let batch = batch_from_tuples(
218 vec![
219 (("k1".into(), "v1".into()), 5, Diff::ONE),
220 (("k1".into(), "v1".into()), 10, -Diff::ONE),
221 (("k1".into(), "v2".into()), 10, Diff::ONE),
222 ],
223 11,
224 );
225 let pairs = collect_diff_pairs(&batch);
226 assert_eq!(
227 pairs,
228 vec![
229 ("k1".into(), 5, None, Some("v1".into())),
230 ("k1".into(), 10, Some("v1".into()), Some("v2".into())),
231 ]
232 );
233 }
234
235 #[mz_ore::test]
236 fn diff_greater_than_one_fans_out() {
237 let batch = batch_from_tuples(vec![(("k1".into(), "v1".into()), 5, Diff::from(3))], 6);
239 let pairs = collect_diff_pairs(&batch);
240 assert_eq!(
241 pairs,
242 vec![
243 ("k1".into(), 5, None, Some("v1".into())),
244 ("k1".into(), 5, None, Some("v1".into())),
245 ("k1".into(), 5, None, Some("v1".into())),
246 ]
247 );
248 }
249
250 #[mz_ore::test]
251 fn multiple_keys_are_independent() {
252 let batch = batch_from_tuples(
253 vec![
254 (("k1".into(), "v1".into()), 5, Diff::ONE),
255 (("k2".into(), "v2".into()), 5, Diff::ONE),
256 ],
257 6,
258 );
259 let pairs = collect_diff_pairs(&batch);
260 assert_eq!(
261 pairs,
262 vec![
263 ("k1".into(), 5, None, Some("v1".into())),
264 ("k2".into(), 5, None, Some("v2".into())),
265 ]
266 );
267 }
268
269 #[mz_ore::test]
270 fn unpaired_before_and_after_at_different_timestamps() {
271 let batch = batch_from_tuples(
274 vec![
275 (("k1".into(), "v1".into()), 5, -Diff::ONE),
276 (("k1".into(), "v2".into()), 10, Diff::ONE),
277 ],
278 11,
279 );
280 let pairs = collect_diff_pairs(&batch);
281 assert_eq!(
282 pairs,
283 vec![
284 ("k1".into(), 5, Some("v1".into()), None),
285 ("k1".into(), 10, None, Some("v2".into())),
286 ]
287 );
288 }
289}