mz_compute/compute_state/
peek_result_iterator.rs1use std::iter::FusedIterator;
9use std::num::NonZeroI64;
10use std::ops::Range;
11
12use differential_dataflow::trace::implementations::BatchContainer;
13use differential_dataflow::trace::{Cursor, TraceReader};
14use mz_ore::result::ResultExt;
15use mz_repr::fixed_length::ToDatumIter;
16use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena};
17use timely::order::PartialOrder;
18
19pub struct PeekResultIterator<Tr>
20where
21 Tr: TraceReader,
22{
23 target_id: GlobalId,
25 cursor: Tr::Cursor,
26 storage: Tr::Storage,
27 map_filter_project: mz_expr::SafeMfpPlan,
28 peek_timestamp: mz_repr::Timestamp,
29 row_builder: Row,
30 datum_vec: DatumVec,
31 literals: Option<Literals<Tr>>,
32}
33
34struct Literals<Tr: TraceReader> {
36 literals: Tr::KeyContainer,
38 range: Range<usize>,
40 current_index: Option<usize>,
42}
43
44impl<Tr: TraceReader<KeyContainer: BatchContainer<Owned: Ord>>> Literals<Tr> {
45 fn new(
47 literals: &mut [<Tr::KeyContainer as BatchContainer>::Owned],
48 cursor: &mut Tr::Cursor,
49 storage: &Tr::Storage,
50 ) -> Self {
51 literals.sort();
54 let mut container = Tr::KeyContainer::with_capacity(literals.len());
55 for constraint in literals {
56 container.push_own(constraint)
57 }
58 let range = 0..container.len();
59 let mut this = Self {
60 literals: container,
61 range,
62 current_index: None,
63 };
64 this.seek_next_literal_key(cursor, storage);
65 this
66 }
67
68 fn peek(&self) -> Option<Tr::Key<'_>> {
70 self.current_index
71 .and_then(|index| self.literals.get(index))
72 }
73
74 fn is_exhausted(&self) -> bool {
76 self.current_index.is_none()
77 }
78
79 fn seek_next_literal_key(&mut self, cursor: &mut Tr::Cursor, storage: &Tr::Storage) {
81 while let Some(index) = self.range.next() {
82 let literal = self.literals.get(index).expect("index out of bounds");
83 cursor.seek_key(storage, literal);
84 if cursor.get_key(storage).map_or(true, |key| key == literal) {
85 self.current_index = Some(index);
86 return;
87 }
88 }
92 self.current_index = None;
93 }
94}
95
96impl<Tr> PeekResultIterator<Tr>
101where
102 for<'a> Tr: TraceReader<
103 Key<'a>: ToDatumIter + Eq,
104 KeyContainer: BatchContainer<Owned = Row>,
105 Val<'a>: ToDatumIter,
106 TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
107 DiffGat<'a> = &'a Diff,
108 >,
109{
110 pub fn new(
111 target_id: GlobalId,
112 map_filter_project: mz_expr::SafeMfpPlan,
113 peek_timestamp: mz_repr::Timestamp,
114 literal_constraints: Option<&mut [Row]>,
115 trace_reader: &mut Tr,
116 ) -> Self {
117 let (mut cursor, storage) = trace_reader.cursor();
118 let literals = literal_constraints
119 .map(|constraints| Literals::new(constraints, &mut cursor, &storage));
120
121 Self {
122 target_id,
123 cursor,
124 storage,
125 map_filter_project,
126 peek_timestamp,
127 row_builder: Row::default(),
128 datum_vec: DatumVec::new(),
129 literals,
130 }
131 }
132
133 fn literals_exhausted(&self) -> bool {
135 self.literals.as_ref().map_or(false, Literals::is_exhausted)
136 }
137}
138
139impl<Tr> FusedIterator for PeekResultIterator<Tr> where
140 for<'a> Tr: TraceReader<
141 Key<'a>: ToDatumIter + Eq,
142 KeyContainer: BatchContainer<Owned = Row>,
143 Val<'a>: ToDatumIter,
144 TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
145 DiffGat<'a> = &'a Diff,
146 >
147{
148}
149
150impl<Tr> Iterator for PeekResultIterator<Tr>
151where
152 for<'a> Tr: TraceReader<
153 Key<'a>: ToDatumIter + Eq,
154 KeyContainer: BatchContainer<Owned = Row>,
155 Val<'a>: ToDatumIter,
156 TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
157 DiffGat<'a> = &'a Diff,
158 >,
159{
160 type Item = Result<(Row, NonZeroI64), String>;
161
162 fn next(&mut self) -> Option<Self::Item> {
163 let result = loop {
164 if self.literals_exhausted() {
165 return None;
166 }
167
168 if !self.cursor.key_valid(&self.storage) {
169 return None;
170 }
171
172 if !self.cursor.val_valid(&self.storage) {
173 let exhausted = self.step_key();
174 if exhausted {
175 return None;
176 }
177 }
178
179 match self.extract_current_row() {
180 Ok(Some(row)) => break Ok(row),
181 Ok(None) => {
182 self.cursor.step_val(&self.storage);
184 }
185 Err(err) => break Err(err),
186 }
187 };
188
189 self.cursor.step_val(&self.storage);
190
191 Some(result)
192 }
193}
194
195impl<Tr> PeekResultIterator<Tr>
196where
197 for<'a> Tr: TraceReader<
198 Key<'a>: ToDatumIter + Eq,
199 KeyContainer: BatchContainer<Owned = Row>,
200 Val<'a>: ToDatumIter,
201 TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
202 DiffGat<'a> = &'a Diff,
203 >,
204{
205 fn extract_current_row(&mut self) -> Result<Option<(Row, NonZeroI64)>, String> {
209 let arena = RowArena::new();
215
216 let key_item = self.cursor.key(&self.storage);
217 let key = key_item.to_datum_iter();
218 let row_item = self.cursor.val(&self.storage);
219 let row = row_item.to_datum_iter();
220
221 let maybe_literal;
224 let mut borrow = self.datum_vec.borrow();
225 borrow.extend(key);
226 borrow.extend(row);
227
228 if let Some(literals) = &mut self.literals
229 && let Some(literal) = literals.peek()
230 {
231 maybe_literal = literal;
234 borrow.extend(maybe_literal.to_datum_iter());
235 }
236 if let Some(result) = self
237 .map_filter_project
238 .evaluate_into(&mut borrow, &arena, &mut self.row_builder)
239 .map(|row| row.cloned())
240 .map_err_to_string_with_causes()?
241 {
242 let mut copies = Diff::ZERO;
243 self.cursor.map_times(&self.storage, |time, diff| {
244 if time.less_equal(&self.peek_timestamp) {
245 copies += diff;
246 }
247 });
248 let copies: i64 = if copies.is_negative() {
249 let row = &*borrow;
250 tracing::error!(
251 target = %self.target_id, diff = %copies, ?row,
252 "index peek encountered negative multiplicities in ok trace",
253 );
254 return Err(format!(
255 "Invalid data in source, \
256 saw retractions ({}) for row that does not exist: {:?}",
257 -copies, row,
258 ));
259 } else {
260 copies.into_inner()
261 };
262 if let Some(copies) = NonZeroI64::new(copies) {
264 Ok(Some((result, copies)))
265 } else {
266 Ok(None)
267 }
268 } else {
269 Ok(None)
270 }
271 }
272
273 fn step_key(&mut self) -> bool {
277 assert!(
278 !self.cursor.val_valid(&self.storage),
279 "must only step key when the vals for a key are exhausted"
280 );
281
282 if let Some(literals) = &mut self.literals {
283 literals.seek_next_literal_key(&mut self.cursor, &self.storage);
284
285 if literals.is_exhausted() {
286 return true;
287 }
288 } else {
289 self.cursor.step_key(&self.storage);
290 }
291
292 if !self.cursor.key_valid(&self.storage) {
293 return true;
295 }
296
297 assert!(
298 self.cursor.val_valid(&self.storage),
299 "there must always be at least one val per key"
300 );
301
302 false
303 }
304}