1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
use std::iter;
use std::rc::Rc;
use crate::avro::DiffPair;
use crate::encode::column_names_and_types;
use differential_dataflow::{
lattice::Lattice,
trace::BatchReader,
trace::{implementations::ord::OrdValBatch, Cursor},
};
use differential_dataflow::{AsCollection, Collection};
use itertools::Itertools;
use ore::collections::CollectionExt;
use repr::{ColumnType, Datum, Diff, RelationDesc, RelationType, Row, ScalarType};
use timely::dataflow::{channels::pact::Pipeline, operators::Operator, Scope, Stream};
pub fn combine_at_timestamp<G: Scope>(
batches: Stream<G, Rc<OrdValBatch<Option<Row>, Row, G::Timestamp, Diff>>>,
) -> Collection<G, (Option<Row>, Vec<DiffPair<Row>>), Diff>
where
G::Timestamp: Lattice + Copy,
{
let mut rows_buf = vec![];
let x: Stream<G, ((Option<Row>, Vec<DiffPair<Row>>), G::Timestamp, Diff)> =
batches.unary(Pipeline, "combine_at_timestamp", move |_, _| {
move |input, output| {
while let Some((cap, batches)) = input.next() {
let mut session = output.session(&cap);
batches.swap(&mut rows_buf);
for batch in rows_buf.drain(..) {
let mut cursor = batch.cursor();
let mut buf: Vec<(G::Timestamp, Option<&Row>, Diff, &Row)> = vec![];
while cursor.key_valid(&batch) {
let k = cursor.key(&batch);
while cursor.val_valid(&batch) {
let val = cursor.val(&batch);
cursor.map_times(&batch, |&t, &diff| {
buf.push((t, k.as_ref(), diff, val));
});
cursor.step_val(&batch);
}
cursor.step_key(&batch);
}
buf.sort_by_key(|(t, _k, diff, _row)| (*t, *diff));
for ((t, k), group) in &buf
.into_iter()
.group_by(|(t, k, _diff, _row)| (*t, k.clone()))
{
let mut out = vec![];
let elts: Vec<(G::Timestamp, Option<&Row>, Diff, &Row)> =
group.collect();
let pos_idx = elts
.binary_search_by(|(_t, _k, diff, _row)| diff.cmp(&0))
.expect_err("there should be no zero-multiplicity entries");
let befores = elts[0..pos_idx]
.iter()
.flat_map(|elt| iter::repeat(elt).take(elt.2.abs() as usize));
let afters = elts[pos_idx..]
.iter()
.flat_map(|elt| iter::repeat(elt).take(elt.2 as usize));
debug_assert!(befores.clone().all(|(_, _, diff, _)| *diff < 0));
debug_assert!(afters.clone().all(|(_, _, diff, _)| *diff > 0));
for pair in befores.zip_longest(afters) {
let (before, after) = match pair {
itertools::EitherOrBoth::Both(before, after) => {
(Some(before.3.clone()), Some(after.3.clone()))
}
itertools::EitherOrBoth::Left(before) => {
(Some(before.3.clone()), None)
}
itertools::EitherOrBoth::Right(after) => {
(None, Some(after.3.clone()))
}
};
out.push(DiffPair { before, after });
}
session.give(((k.cloned(), out), t, 1));
}
}
}
}
});
x.as_collection()
}
pub fn dbz_desc(desc: RelationDesc) -> RelationDesc {
let cols = column_names_and_types(desc);
let row = ColumnType {
nullable: true,
scalar_type: ScalarType::Record {
fields: cols.into_iter().collect(),
custom_oid: None,
custom_name: Some("row".to_owned()),
},
};
let typ = RelationType::new(vec![row.clone(), row]);
RelationDesc::new(typ, ["before", "after"])
}
pub fn dbz_format(rp: &mut Row, dp: DiffPair<Row>) -> Row {
if let Some(before) = dp.before {
rp.push_list_with(|rp| rp.extend_by_row(&before));
} else {
rp.push(Datum::Null);
}
if let Some(after) = dp.after {
rp.push_list_with(|rp| rp.extend_by_row(&after));
} else {
rp.push(Datum::Null);
}
rp.finish_and_reuse()
}
pub fn upsert_format(dps: Vec<DiffPair<Row>>) -> Option<Row> {
let dp = dps.expect_element(
"primary key error: expected at most one update \
per key and timestamp. This can happen when the configured sink key is \
not a primary key of the sinked relation.",
);
dp.after
}