mz_compute/compute_state/
peek_result_iterator.rs1use std::iter::FusedIterator;
9use std::num::NonZeroI64;
10use std::ops::Range;
11
12use differential_dataflow::trace::implementations::BatchContainer;
13use differential_dataflow::trace::{Cursor, TraceReader};
14use mz_ore::result::ResultExt;
15use mz_repr::fixed_length::ExtendDatums;
16use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena};
17use timely::order::PartialOrder;
18
19pub struct PeekResultIterator<Tr>
20where
21 Tr: TraceReader,
22{
23 target_id: GlobalId,
25 cursor: Tr::Cursor,
26 storage: Tr::Storage,
27 map_filter_project: mz_expr::SafeMfpPlan,
28 peek_timestamp: mz_repr::Timestamp,
29 row_builder: Row,
30 datum_vec: DatumVec,
31 literals: Option<Literals<Tr>>,
32}
33
34struct Literals<Tr: TraceReader> {
36 literals: Tr::KeyContainer,
38 range: Range<usize>,
40 current_index: Option<usize>,
42}
43
44impl<Tr: TraceReader<KeyContainer: BatchContainer<Owned: Ord>>> Literals<Tr> {
45 fn new(
47 literals: &mut [<Tr::KeyContainer as BatchContainer>::Owned],
48 cursor: &mut Tr::Cursor,
49 storage: &Tr::Storage,
50 ) -> Self {
51 literals.sort();
54 let mut container = Tr::KeyContainer::with_capacity(literals.len());
55 for constraint in literals {
56 container.push_own(constraint)
57 }
58 let range = 0..container.len();
59 let mut this = Self {
60 literals: container,
61 range,
62 current_index: None,
63 };
64 this.seek_next_literal_key(cursor, storage);
65 this
66 }
67
68 fn peek(&self) -> Option<Tr::Key<'_>> {
70 self.current_index
71 .and_then(|index| self.literals.get(index))
72 }
73
74 fn is_exhausted(&self) -> bool {
76 self.current_index.is_none()
77 }
78
79 fn seek_next_literal_key(&mut self, cursor: &mut Tr::Cursor, storage: &Tr::Storage) {
81 while let Some(index) = self.range.next() {
82 let literal = self.literals.get(index).expect("index out of bounds");
83 cursor.seek_key(storage, literal);
84 if cursor.get_key(storage).map_or(true, |key| key == literal) {
85 self.current_index = Some(index);
86 return;
87 }
88 }
92 self.current_index = None;
93 }
94}
95
96impl<Tr> PeekResultIterator<Tr>
101where
102 for<'a> Tr: TraceReader<
103 Key<'a>: ExtendDatums + Eq,
104 KeyContainer: BatchContainer<Owned = Row>,
105 Val<'a>: ExtendDatums,
106 TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
107 DiffGat<'a> = &'a Diff,
108 >,
109{
110 pub fn new(
111 target_id: GlobalId,
112 map_filter_project: mz_expr::SafeMfpPlan,
113 peek_timestamp: mz_repr::Timestamp,
114 literal_constraints: Option<&mut [Row]>,
115 trace_reader: &mut Tr,
116 ) -> Self {
117 let (mut cursor, storage) = trace_reader.cursor();
118 let literals = literal_constraints
119 .map(|constraints| Literals::new(constraints, &mut cursor, &storage));
120
121 Self {
122 target_id,
123 cursor,
124 storage,
125 map_filter_project,
126 peek_timestamp,
127 row_builder: Row::default(),
128 datum_vec: DatumVec::new(),
129 literals,
130 }
131 }
132
133 fn literals_exhausted(&self) -> bool {
135 self.literals.as_ref().map_or(false, Literals::is_exhausted)
136 }
137}
138
139impl<Tr> FusedIterator for PeekResultIterator<Tr> where
140 for<'a> Tr: TraceReader<
141 Key<'a>: ExtendDatums + Eq,
142 KeyContainer: BatchContainer<Owned = Row>,
143 Val<'a>: ExtendDatums,
144 TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
145 DiffGat<'a> = &'a Diff,
146 >
147{
148}
149
150impl<Tr> Iterator for PeekResultIterator<Tr>
151where
152 for<'a> Tr: TraceReader<
153 Key<'a>: ExtendDatums + Eq,
154 KeyContainer: BatchContainer<Owned = Row>,
155 Val<'a>: ExtendDatums,
156 TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
157 DiffGat<'a> = &'a Diff,
158 >,
159{
160 type Item = Result<(Row, NonZeroI64), String>;
161
162 fn next(&mut self) -> Option<Self::Item> {
163 let result = loop {
164 if self.literals_exhausted() {
165 return None;
166 }
167
168 if !self.cursor.key_valid(&self.storage) {
169 return None;
170 }
171
172 if !self.cursor.val_valid(&self.storage) {
173 let exhausted = self.step_key();
174 if exhausted {
175 return None;
176 }
177 }
178
179 match self.extract_current_row() {
180 Ok(Some(row)) => break Ok(row),
181 Ok(None) => {
182 self.cursor.step_val(&self.storage);
184 }
185 Err(err) => break Err(err),
186 }
187 };
188
189 self.cursor.step_val(&self.storage);
190
191 Some(result)
192 }
193}
194
195impl<Tr> PeekResultIterator<Tr>
196where
197 for<'a> Tr: TraceReader<
198 Key<'a>: ExtendDatums + Eq,
199 KeyContainer: BatchContainer<Owned = Row>,
200 Val<'a>: ExtendDatums,
201 TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
202 DiffGat<'a> = &'a Diff,
203 >,
204{
205 fn extract_current_row(&mut self) -> Result<Option<(Row, NonZeroI64)>, String> {
209 let arena = RowArena::new();
215
216 let key_item = self.cursor.key(&self.storage);
217 let row_item = self.cursor.val(&self.storage);
218
219 let maybe_literal;
222 let mut borrow = self.datum_vec.borrow();
223 key_item.extend_datums(&arena, &mut borrow, None);
224 row_item.extend_datums(&arena, &mut borrow, None);
225
226 if let Some(literals) = &mut self.literals
227 && let Some(literal) = literals.peek()
228 {
229 maybe_literal = literal;
232 maybe_literal.extend_datums(&arena, &mut borrow, None);
233 }
234 if let Some(result) = self
235 .map_filter_project
236 .evaluate_into(&mut borrow, &arena, &mut self.row_builder)
237 .map(|row| row.cloned())
238 .map_err_to_string_with_causes()?
239 {
240 let mut copies = Diff::ZERO;
241 self.cursor.map_times(&self.storage, |time, diff| {
242 if time.less_equal(&self.peek_timestamp) {
243 copies += diff;
244 }
245 });
246 let copies: i64 = if copies.is_negative() {
247 let row = &*borrow;
248 tracing::error!(
249 target = %self.target_id, diff = %copies, ?row,
250 "index peek encountered negative multiplicities in ok trace",
251 );
252 return Err(format!(
253 "Invalid data in source, \
254 saw retractions ({}) for row that does not exist: {:?}",
255 -copies, row,
256 ));
257 } else {
258 copies.into_inner()
259 };
260 if let Some(copies) = NonZeroI64::new(copies) {
262 Ok(Some((result, copies)))
263 } else {
264 Ok(None)
265 }
266 } else {
267 Ok(None)
268 }
269 }
270
271 fn step_key(&mut self) -> bool {
275 assert!(
276 !self.cursor.val_valid(&self.storage),
277 "must only step key when the vals for a key are exhausted"
278 );
279
280 if let Some(literals) = &mut self.literals {
281 literals.seek_next_literal_key(&mut self.cursor, &self.storage);
282
283 if literals.is_exhausted() {
284 return true;
285 }
286 } else {
287 self.cursor.step_key(&self.storage);
288 }
289
290 if !self.cursor.key_valid(&self.storage) {
291 return true;
293 }
294
295 assert!(
296 self.cursor.val_valid(&self.storage),
297 "there must always be at least one val per key"
298 );
299
300 false
301 }
302}