1use std::collections::BTreeMap;
11use std::iter;
12use std::sync::LazyLock;
13
14use differential_dataflow::trace::implementations::BatchContainer;
15use differential_dataflow::trace::{BatchReader, Cursor};
16use itertools::EitherOrBoth;
17use maplit::btreemap;
18use mz_ore::cast::CastFrom;
19use mz_repr::{
20 CatalogItemId, ColumnName, Datum, Diff, Row, RowPacker, SqlColumnType, SqlScalarType,
21};
22
23use crate::avro::DiffPair;
24
25pub fn for_each_diff_pair<B, F>(batch: &B, mut on_diff_pair: F)
35where
36 B: BatchReader<Diff = Diff>,
37 B::Time: Copy,
38 B::ValOwn: 'static,
39 F: FnMut(&<B::KeyContainer as BatchContainer>::Owned, B::Time, DiffPair<B::ValOwn>),
40{
41 let mut befores: Vec<(B::Time, B::ValOwn, usize)> = vec![];
42 let mut afters: Vec<(B::Time, B::ValOwn, usize)> = vec![];
43
44 let mut cursor = batch.cursor();
45 while cursor.key_valid(batch) {
46 let k = cursor.key(batch);
47
48 while cursor.val_valid(batch) {
51 let v = cursor.val(batch);
52 cursor.map_times(batch, |t, diff| {
53 let diff = B::owned_diff(diff);
54 let update = (
55 B::owned_time(t),
56 B::owned_val(v),
57 usize::cast_from(diff.unsigned_abs()),
58 );
59 if diff < Diff::ZERO {
60 befores.push(update);
61 } else {
62 afters.push(update);
63 }
64 });
65 cursor.step_val(batch);
66 }
67
68 befores.sort_by_key(|(t, _v, _diff)| *t);
69 afters.sort_by_key(|(t, _v, _diff)| *t);
70
71 let fan_out = |(t, v, cnt): (B::Time, B::ValOwn, usize)| iter::repeat_n((t, v), cnt);
77 let befores_iter = befores.drain(..).flat_map(fan_out);
78 let afters_iter = afters.drain(..).flat_map(fan_out);
79
80 let key_owned = <B::KeyContainer as BatchContainer>::into_owned(k);
81
82 for pair in
83 itertools::merge_join_by(befores_iter, afters_iter, |(t1, _v1), (t2, _v2)| t1.cmp(t2))
84 {
85 let (t, before, after) = match pair {
86 EitherOrBoth::Both((t, before), (_t, after)) => (t, Some(before), Some(after)),
87 EitherOrBoth::Left((t, before)) => (t, Some(before), None),
88 EitherOrBoth::Right((t, after)) => (t, None, Some(after)),
89 };
90 on_diff_pair(&key_owned, t, DiffPair { before, after });
91 }
92
93 cursor.step_key(batch);
94 }
95}
96
97pub(crate) const TRANSACTION_TYPE_ID: CatalogItemId = CatalogItemId::Transient(1);
105pub(crate) const DBZ_ROW_TYPE_ID: CatalogItemId = CatalogItemId::Transient(2);
106
107pub static ENVELOPE_CUSTOM_NAMES: LazyLock<BTreeMap<CatalogItemId, String>> = LazyLock::new(|| {
108 btreemap! {
109 TRANSACTION_TYPE_ID => "transaction".into(),
110 DBZ_ROW_TYPE_ID => "row".into(),
111 }
112});
113
114pub(crate) fn dbz_envelope(
115 names_and_types: Vec<(ColumnName, SqlColumnType)>,
116) -> Vec<(ColumnName, SqlColumnType)> {
117 let row = SqlColumnType {
118 nullable: true,
119 scalar_type: SqlScalarType::Record {
120 fields: names_and_types.into(),
121 custom_id: Some(DBZ_ROW_TYPE_ID),
122 },
123 };
124 vec![("before".into(), row.clone()), ("after".into(), row)]
125}
126
127pub fn dbz_format(rp: &mut RowPacker, dp: DiffPair<Row>) {
128 if let Some(before) = dp.before {
129 rp.push_list_with(|rp| rp.extend_by_row(&before));
130 } else {
131 rp.push(Datum::Null);
132 }
133 if let Some(after) = dp.after {
134 rp.push_list_with(|rp| rp.extend_by_row(&after));
135 } else {
136 rp.push(Datum::Null);
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use differential_dataflow::trace::implementations::chunker::ContainerChunker;
143 use differential_dataflow::trace::implementations::{ValBatcher, ValBuilder};
144 use differential_dataflow::trace::{Batcher, Builder};
145 use timely::container::{ContainerBuilder, PushInto};
146 use timely::progress::Antichain;
147
148 use super::*;
149
150 fn batch_from_tuples(
153 mut tuples: Vec<((String, String), u64, Diff)>,
154 upper: u64,
155 ) -> <ValBuilder<String, String, u64, Diff> as Builder>::Output {
156 let mut batcher = ValBatcher::<String, String, u64, Diff>::new(None, 0);
159 let mut chunker = ContainerChunker::<Vec<((String, String), u64, Diff)>>::default();
160 chunker.push_into(&mut tuples);
161 while let Some(chunk) = chunker.extract() {
162 batcher.push_into(std::mem::take(chunk));
163 }
164 while let Some(chunk) = chunker.finish() {
165 batcher.push_into(std::mem::take(chunk));
166 }
167 let (mut chain, description) = batcher.seal(Antichain::from_elem(upper));
168 ValBuilder::<String, String, u64, Diff>::seal(&mut chain, description)
169 }
170
171 fn collect_diff_pairs<B>(batch: &B) -> Vec<(String, u64, Option<String>, Option<String>)>
174 where
175 B: BatchReader<Diff = Diff>,
176 B::Time: Copy + Into<u64>,
177 B::ValOwn: 'static + Into<String>,
178 <B::KeyContainer as BatchContainer>::Owned: Into<String> + Clone,
179 {
180 let mut out = vec![];
181 for_each_diff_pair(batch, |k, t, dp| {
182 out.push((
183 k.clone().into(),
184 t.into(),
185 dp.before.map(Into::into),
186 dp.after.map(Into::into),
187 ));
188 });
189 out.sort();
190 out
191 }
192
193 #[mz_ore::test]
194 fn single_insertion() {
195 let batch = batch_from_tuples(vec![(("k1".into(), "v1".into()), 5, Diff::ONE)], 6);
196 let pairs = collect_diff_pairs(&batch);
197 assert_eq!(pairs, vec![("k1".into(), 5, None, Some("v1".into()))]);
198 }
199
200 #[mz_ore::test]
201 fn single_retraction() {
202 let batch = batch_from_tuples(vec![(("k1".into(), "v1".into()), 5, -Diff::ONE)], 6);
203 let pairs = collect_diff_pairs(&batch);
204 assert_eq!(pairs, vec![("k1".into(), 5, Some("v1".into()), None)]);
205 }
206
207 #[mz_ore::test]
208 fn update_at_same_timestamp() {
209 let batch = batch_from_tuples(
212 vec![
213 (("k1".into(), "v1".into()), 5, -Diff::ONE),
214 (("k1".into(), "v2".into()), 5, Diff::ONE),
215 ],
216 6,
217 );
218 let pairs = collect_diff_pairs(&batch);
219 assert_eq!(
220 pairs,
221 vec![("k1".into(), 5, Some("v1".into()), Some("v2".into()))]
222 );
223 }
224
225 #[mz_ore::test]
226 fn update_across_timestamps() {
227 let batch = batch_from_tuples(
229 vec![
230 (("k1".into(), "v1".into()), 5, Diff::ONE),
231 (("k1".into(), "v1".into()), 10, -Diff::ONE),
232 (("k1".into(), "v2".into()), 10, Diff::ONE),
233 ],
234 11,
235 );
236 let pairs = collect_diff_pairs(&batch);
237 assert_eq!(
238 pairs,
239 vec![
240 ("k1".into(), 5, None, Some("v1".into())),
241 ("k1".into(), 10, Some("v1".into()), Some("v2".into())),
242 ]
243 );
244 }
245
246 #[mz_ore::test]
247 fn diff_greater_than_one_fans_out() {
248 let batch = batch_from_tuples(vec![(("k1".into(), "v1".into()), 5, Diff::from(3))], 6);
250 let pairs = collect_diff_pairs(&batch);
251 assert_eq!(
252 pairs,
253 vec![
254 ("k1".into(), 5, None, Some("v1".into())),
255 ("k1".into(), 5, None, Some("v1".into())),
256 ("k1".into(), 5, None, Some("v1".into())),
257 ]
258 );
259 }
260
261 #[mz_ore::test]
262 fn multiple_keys_are_independent() {
263 let batch = batch_from_tuples(
264 vec![
265 (("k1".into(), "v1".into()), 5, Diff::ONE),
266 (("k2".into(), "v2".into()), 5, Diff::ONE),
267 ],
268 6,
269 );
270 let pairs = collect_diff_pairs(&batch);
271 assert_eq!(
272 pairs,
273 vec![
274 ("k1".into(), 5, None, Some("v1".into())),
275 ("k2".into(), 5, None, Some("v2".into())),
276 ]
277 );
278 }
279
280 #[mz_ore::test]
281 fn unpaired_before_and_after_at_different_timestamps() {
282 let batch = batch_from_tuples(
285 vec![
286 (("k1".into(), "v1".into()), 5, -Diff::ONE),
287 (("k1".into(), "v2".into()), 10, Diff::ONE),
288 ],
289 11,
290 );
291 let pairs = collect_diff_pairs(&batch);
292 assert_eq!(
293 pairs,
294 vec![
295 ("k1".into(), 5, Some("v1".into()), None),
296 ("k1".into(), 10, None, Some("v2".into())),
297 ]
298 );
299 }
300}