Skip to main content

mz_compute/compute_state/
peek_result_iterator.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Code for extracting a peek result out of compute state/an arrangement.
7
8use std::iter::FusedIterator;
9use std::num::NonZeroI64;
10use std::ops::Range;
11
12use differential_dataflow::trace::implementations::BatchContainer;
13use differential_dataflow::trace::{Cursor, TraceReader};
14use mz_ore::result::ResultExt;
15use mz_repr::fixed_length::ToDatumIter;
16use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena};
17use timely::order::PartialOrder;
18
19pub struct PeekResultIterator<Tr>
20where
21    Tr: TraceReader,
22{
23    // For debug/trace logging.
24    target_id: GlobalId,
25    cursor: Tr::Cursor,
26    storage: Tr::Storage,
27    map_filter_project: mz_expr::SafeMfpPlan,
28    peek_timestamp: mz_repr::Timestamp,
29    row_builder: Row,
30    datum_vec: DatumVec,
31    literals: Option<Literals<Tr>>,
32}
33
34/// Helper to handle literals in peeks
35struct Literals<Tr: TraceReader> {
36    /// The literals in a container, sorted by `Ord`.
37    literals: Tr::KeyContainer,
38    /// The range of the literals that are still available.
39    range: Range<usize>,
40    /// The current index in the literals.
41    current_index: Option<usize>,
42}
43
44impl<Tr: TraceReader<KeyContainer: BatchContainer<Owned: Ord>>> Literals<Tr> {
45    /// Construct a new `Literals` from a mutable slice of literals. Sorts contents.
46    fn new(
47        literals: &mut [<Tr::KeyContainer as BatchContainer>::Owned],
48        cursor: &mut Tr::Cursor,
49        storage: &Tr::Storage,
50    ) -> Self {
51        // We have to sort the literal constraints because cursor.seek_key can
52        // seek only forward.
53        literals.sort();
54        let mut container = Tr::KeyContainer::with_capacity(literals.len());
55        for constraint in literals {
56            container.push_own(constraint)
57        }
58        let range = 0..container.len();
59        let mut this = Self {
60            literals: container,
61            range,
62            current_index: None,
63        };
64        this.seek_next_literal_key(cursor, storage);
65        this
66    }
67
68    /// Returns the current literal, if any.
69    fn peek(&self) -> Option<Tr::Key<'_>> {
70        self.current_index
71            .and_then(|index| self.literals.get(index))
72    }
73
74    /// Returns `true` if there are no more literals to process.
75    fn is_exhausted(&self) -> bool {
76        self.current_index.is_none()
77    }
78
79    /// Seeks the cursor to the next key of a matching literal, if any.
80    fn seek_next_literal_key(&mut self, cursor: &mut Tr::Cursor, storage: &Tr::Storage) {
81        while let Some(index) = self.range.next() {
82            let literal = self.literals.get(index).expect("index out of bounds");
83            cursor.seek_key(storage, literal);
84            if cursor.get_key(storage).map_or(true, |key| key == literal) {
85                self.current_index = Some(index);
86                return;
87            }
88            // The cursor landed on a record that has a different key,
89            // meaning that there is no record whose key would match the
90            // current literal.
91        }
92        self.current_index = None;
93    }
94}
95
96/// An [Iterator] that extracts a peek result from a [TraceReader].
97///
98/// The iterator will apply a given `MapFilterProject` and obey literal
99/// constraints, if any.
100impl<Tr> PeekResultIterator<Tr>
101where
102    for<'a> Tr: TraceReader<
103            Key<'a>: ToDatumIter + Eq,
104            KeyContainer: BatchContainer<Owned = Row>,
105            Val<'a>: ToDatumIter,
106            TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
107            DiffGat<'a> = &'a Diff,
108        >,
109{
110    pub fn new(
111        target_id: GlobalId,
112        map_filter_project: mz_expr::SafeMfpPlan,
113        peek_timestamp: mz_repr::Timestamp,
114        literal_constraints: Option<&mut [Row]>,
115        trace_reader: &mut Tr,
116    ) -> Self {
117        let (mut cursor, storage) = trace_reader.cursor();
118        let literals = literal_constraints
119            .map(|constraints| Literals::new(constraints, &mut cursor, &storage));
120
121        Self {
122            target_id,
123            cursor,
124            storage,
125            map_filter_project,
126            peek_timestamp,
127            row_builder: Row::default(),
128            datum_vec: DatumVec::new(),
129            literals,
130        }
131    }
132
133    /// Returns `true` if the iterator has no more literals to process, or if there are no literals at all.
134    fn literals_exhausted(&self) -> bool {
135        self.literals.as_ref().map_or(false, Literals::is_exhausted)
136    }
137}
138
139impl<Tr> FusedIterator for PeekResultIterator<Tr> where
140    for<'a> Tr: TraceReader<
141            Key<'a>: ToDatumIter + Eq,
142            KeyContainer: BatchContainer<Owned = Row>,
143            Val<'a>: ToDatumIter,
144            TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
145            DiffGat<'a> = &'a Diff,
146        >
147{
148}
149
150impl<Tr> Iterator for PeekResultIterator<Tr>
151where
152    for<'a> Tr: TraceReader<
153            Key<'a>: ToDatumIter + Eq,
154            KeyContainer: BatchContainer<Owned = Row>,
155            Val<'a>: ToDatumIter,
156            TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
157            DiffGat<'a> = &'a Diff,
158        >,
159{
160    type Item = Result<(Row, NonZeroI64), String>;
161
162    fn next(&mut self) -> Option<Self::Item> {
163        let result = loop {
164            if self.literals_exhausted() {
165                return None;
166            }
167
168            if !self.cursor.key_valid(&self.storage) {
169                return None;
170            }
171
172            if !self.cursor.val_valid(&self.storage) {
173                let exhausted = self.step_key();
174                if exhausted {
175                    return None;
176                }
177            }
178
179            match self.extract_current_row() {
180                Ok(Some(row)) => break Ok(row),
181                Ok(None) => {
182                    // Have to keep stepping and try with the next val.
183                    self.cursor.step_val(&self.storage);
184                }
185                Err(err) => break Err(err),
186            }
187        };
188
189        self.cursor.step_val(&self.storage);
190
191        Some(result)
192    }
193}
194
195impl<Tr> PeekResultIterator<Tr>
196where
197    for<'a> Tr: TraceReader<
198            Key<'a>: ToDatumIter + Eq,
199            KeyContainer: BatchContainer<Owned = Row>,
200            Val<'a>: ToDatumIter,
201            TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
202            DiffGat<'a> = &'a Diff,
203        >,
204{
205    /// Extracts and returns the row currently pointed at by our cursor. Returns
206    /// `Ok(None)` if our MapFilterProject evaluates to `None`. Also returns any
207    /// errors that arise from evaluating the MapFilterProject.
208    fn extract_current_row(&mut self) -> Result<Option<(Row, NonZeroI64)>, String> {
209        // TODO: This arena could be maintained and reused for longer,
210        // but it wasn't clear at what interval we should flush
211        // it to ensure we don't accidentally spike our memory use.
212        // This choice is conservative, and not the end of the world
213        // from a performance perspective.
214        let arena = RowArena::new();
215
216        let key_item = self.cursor.key(&self.storage);
217        let key = key_item.to_datum_iter();
218        let row_item = self.cursor.val(&self.storage);
219        let row = row_item.to_datum_iter();
220
221        // An optional literal that we might have added to the borrow. Needs to be declared
222        // before the borrow to ensure correct drop order.
223        let maybe_literal;
224        let mut borrow = self.datum_vec.borrow();
225        borrow.extend(key);
226        borrow.extend(row);
227
228        if let Some(literals) = &mut self.literals
229            && let Some(literal) = literals.peek()
230        {
231            // The peek was created from an IndexedFilter join. We have to add those columns
232            // here that the join would add in a dataflow.
233            maybe_literal = literal;
234            borrow.extend(maybe_literal.to_datum_iter());
235        }
236        if let Some(result) = self
237            .map_filter_project
238            .evaluate_into(&mut borrow, &arena, &mut self.row_builder)
239            .map(|row| row.cloned())
240            .map_err_to_string_with_causes()?
241        {
242            let mut copies = Diff::ZERO;
243            self.cursor.map_times(&self.storage, |time, diff| {
244                if time.less_equal(&self.peek_timestamp) {
245                    copies += diff;
246                }
247            });
248            let copies: i64 = if copies.is_negative() {
249                let row = &*borrow;
250                tracing::error!(
251                    target = %self.target_id, diff = %copies, ?row,
252                    "index peek encountered negative multiplicities in ok trace",
253                );
254                return Err(format!(
255                    "Invalid data in source, \
256                             saw retractions ({}) for row that does not exist: {:?}",
257                    -copies, row,
258                ));
259            } else {
260                copies.into_inner()
261            };
262            // if copies > 0 ... otherwise skip
263            if let Some(copies) = NonZeroI64::new(copies) {
264                Ok(Some((result, copies)))
265            } else {
266                Ok(None)
267            }
268        } else {
269            Ok(None)
270        }
271    }
272
273    /// Steps the key forward, respecting literal constraints.
274    ///
275    /// Returns `true` if we are exhausted.
276    fn step_key(&mut self) -> bool {
277        assert!(
278            !self.cursor.val_valid(&self.storage),
279            "must only step key when the vals for a key are exhausted"
280        );
281
282        if let Some(literals) = &mut self.literals {
283            literals.seek_next_literal_key(&mut self.cursor, &self.storage);
284
285            if literals.is_exhausted() {
286                return true;
287            }
288        } else {
289            self.cursor.step_key(&self.storage);
290        }
291
292        if !self.cursor.key_valid(&self.storage) {
293            // We're exhausted!
294            return true;
295        }
296
297        assert!(
298            self.cursor.val_valid(&self.storage),
299            "there must always be at least one val per key"
300        );
301
302        false
303    }
304}