1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use timely::dataflow::Scope;
use expr::{MapFilterProject, MirScalarExpr, TableFunc};
use repr::{Row, RowArena};
use crate::operator::StreamExt;
use crate::render::context::CollectionBundle;
use crate::render::context::Context;
use repr::DatumVec;
impl<G> Context<G, Row, repr::Timestamp>
where
G: Scope<Timestamp = repr::Timestamp>,
{
pub fn render_flat_map(
&mut self,
input: CollectionBundle<G, Row, G::Timestamp>,
func: TableFunc,
exprs: Vec<MirScalarExpr>,
mfp: MapFilterProject,
input_key: Option<Vec<MirScalarExpr>>,
) -> CollectionBundle<G, Row, G::Timestamp> {
let mfp_plan = mfp.into_plan().expect("MapFilterProject planning failed");
let (ok_collection, err_collection) = input.as_specific_collection(input_key.as_deref());
let (oks, errs) = ok_collection.inner.flat_map_fallible("FlatMapStage", {
let mut datums = DatumVec::new();
let mut row_builder = Row::default();
move |(input_row, time, diff)| {
let temp_storage = RowArena::new();
let mut datums_local = datums.borrow_with(&input_row);
let datums_len = datums_local.len();
let exprs = exprs
.iter()
.map(|e| e.eval(&datums_local, &temp_storage))
.collect::<Result<Vec<_>, _>>();
let exprs = match exprs {
Ok(exprs) => exprs,
Err(e) => return vec![(Err((e.into(), time, diff)))],
};
let output_rows = match func.eval(&exprs, &temp_storage) {
Ok(exprs) => exprs,
Err(e) => return vec![(Err((e.into(), time, diff)))],
};
let temp_storage = &temp_storage;
let mfp_plan = &mfp_plan;
let output_rows_vec: Vec<_> = output_rows.collect();
let row_builder = &mut row_builder;
output_rows_vec
.iter()
.flat_map(move |(output_row, r)| {
datums_local.truncate(datums_len);
datums_local.extend(output_row.iter());
mfp_plan
.evaluate(
&mut datums_local,
temp_storage,
time,
diff * *r,
row_builder,
)
.collect::<Vec<_>>()
})
.collect::<Vec<_>>()
}
});
use differential_dataflow::AsCollection;
let ok_collection = oks.as_collection();
let new_err_collection = errs.as_collection();
let err_collection = err_collection.concat(&new_err_collection);
CollectionBundle::from_collections(ok_collection, err_collection)
}
}