1use std::collections::BTreeMap;
11use std::iter;
12use std::sync::LazyLock;
13
14use differential_dataflow::IntoOwned;
15use differential_dataflow::lattice::Lattice;
16use differential_dataflow::operators::arrange::Arranged;
17use differential_dataflow::trace::{Batch, BatchReader, Cursor, TraceReader};
18use differential_dataflow::{AsCollection, Collection};
19use itertools::{EitherOrBoth, Itertools};
20use maplit::btreemap;
21use mz_ore::cast::CastFrom;
22use mz_repr::{CatalogItemId, ColumnName, ColumnType, Datum, Diff, Row, RowPacker, ScalarType};
23use timely::dataflow::channels::pact::Pipeline;
24use timely::dataflow::operators::Operator;
25use timely::dataflow::{Scope, Stream};
26
27use crate::avro::DiffPair;
28
29pub fn combine_at_timestamp<G: Scope, Tr>(
36 arranged: Arranged<G, Tr>,
37) -> Collection<G, (Option<Row>, Vec<DiffPair<Row>>), Diff>
38where
39 G::Timestamp: Lattice + Copy,
40 Tr: Clone
41 + for<'a> TraceReader<
42 Key<'a> = &'a Option<Row>,
43 Val<'a> = &'a Row,
44 Time = G::Timestamp,
45 Diff = Diff,
46 >,
47 Tr::Batch: Batch,
48 for<'a> Tr::TimeGat<'a>: Ord,
49{
50 let x: Stream<G, ((Option<Row>, Vec<DiffPair<Row>>), G::Timestamp, Diff)> = arranged
51 .stream
52 .unary(Pipeline, "combine_at_timestamp", move |_, _| {
53 move |input, output| {
54 while let Some((cap, batches)) = input.next() {
55 let mut session = output.session(&cap);
56 for batch in batches.drain(..) {
57 let mut befores = vec![];
58 let mut afters = vec![];
59
60 let mut cursor = batch.cursor();
61 while cursor.key_valid(&batch) {
62 let k = cursor.key(&batch);
63
64 while cursor.val_valid(&batch) {
67 let v = cursor.val(&batch);
68 cursor.map_times(&batch, |t, diff| {
69 let diff = diff.into_owned();
70 let update = (
71 t.into_owned(),
72 v.clone(),
73 usize::cast_from(diff.unsigned_abs()),
74 );
75 if diff < Diff::ZERO {
76 befores.push(update);
77 } else {
78 afters.push(update);
79 }
80 });
81 cursor.step_val(&batch);
82 }
83
84 befores.sort_by_key(|(t, _v, _diff)| *t);
86 afters.sort_by_key(|(t, _v, _diff)| *t);
87
88 let befores = befores
90 .drain(..)
91 .flat_map(|(t, v, cnt)| iter::repeat((t, v)).take(cnt));
92 let afters = afters
93 .drain(..)
94 .flat_map(|(t, v, cnt)| iter::repeat((t, v)).take(cnt));
95
96 let groups = itertools::merge_join_by(
99 befores,
100 afters,
101 |(t1, _v1), (t2, _v2)| t1.cmp(t2),
102 )
103 .map(|pair| match pair {
104 EitherOrBoth::Both((t, before), (_t, after)) => {
105 (t, Some(before.clone()), Some(after.clone()))
106 }
107 EitherOrBoth::Left((t, before)) => (t, Some(before.clone()), None),
108 EitherOrBoth::Right((t, after)) => (t, None, Some(after.clone())),
109 })
110 .group_by(|(t, _before, _after)| *t);
111
112 for (t, group) in &groups {
115 let group = group
116 .map(|(_t, before, after)| DiffPair { before, after })
117 .collect();
118 session.give(((k.clone(), group), t, Diff::ONE));
119 }
120
121 cursor.step_key(&batch);
122 }
123 }
124 }
125 }
126 });
127 x.as_collection()
128}
129
130pub(crate) const TRANSACTION_TYPE_ID: CatalogItemId = CatalogItemId::Transient(1);
138pub(crate) const DBZ_ROW_TYPE_ID: CatalogItemId = CatalogItemId::Transient(2);
139
140pub static ENVELOPE_CUSTOM_NAMES: LazyLock<BTreeMap<CatalogItemId, String>> = LazyLock::new(|| {
141 btreemap! {
142 TRANSACTION_TYPE_ID => "transaction".into(),
143 DBZ_ROW_TYPE_ID => "row".into(),
144 }
145});
146
147pub(crate) fn dbz_envelope(
148 names_and_types: Vec<(ColumnName, ColumnType)>,
149) -> Vec<(ColumnName, ColumnType)> {
150 let row = ColumnType {
151 nullable: true,
152 scalar_type: ScalarType::Record {
153 fields: names_and_types.into(),
154 custom_id: Some(DBZ_ROW_TYPE_ID),
155 },
156 };
157 vec![("before".into(), row.clone()), ("after".into(), row)]
158}
159
160pub fn dbz_format(rp: &mut RowPacker, dp: DiffPair<Row>) {
161 if let Some(before) = dp.before {
162 rp.push_list_with(|rp| rp.extend_by_row(&before));
163 } else {
164 rp.push(Datum::Null);
165 }
166 if let Some(after) = dp.after {
167 rp.push_list_with(|rp| rp.extend_by_row(&after));
168 } else {
169 rp.push(Datum::Null);
170 }
171}