mz_compute/compute_state/
peek_result_iterator.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Code for extracting a peek result out of compute state/an arrangement.
7
8use std::iter::FusedIterator;
9use std::num::NonZeroI64;
10use std::ops::Range;
11
12use differential_dataflow::trace::implementations::BatchContainer;
13use differential_dataflow::trace::{Cursor, TraceReader};
14use mz_ore::result::ResultExt;
15use mz_repr::fixed_length::ToDatumIter;
16use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena};
17use timely::order::PartialOrder;
18
19pub struct PeekResultIterator<Tr>
20where
21    Tr: TraceReader,
22{
23    // For debug/trace logging.
24    target_id: GlobalId,
25    cursor: Tr::Cursor,
26    storage: Tr::Storage,
27    map_filter_project: mz_expr::SafeMfpPlan,
28    peek_timestamp: mz_repr::Timestamp,
29    row_builder: Row,
30    datum_vec: DatumVec,
31    literals: Option<Literals<Tr>>,
32}
33
34/// Helper to handle literals in peeks
35struct Literals<Tr: TraceReader> {
36    /// The literals in a container, sorted by `Ord`.
37    literals: Tr::KeyContainer,
38    /// The range of the literals that are still available.
39    range: Range<usize>,
40    /// The current index in the literals.
41    current_index: Option<usize>,
42}
43
44impl<Tr: TraceReader<KeyOwn: Ord>> Literals<Tr> {
45    /// Construct a new `Literals` from a mutable slice of literals. Sorts contents.
46    fn new(literals: &mut [Tr::KeyOwn], cursor: &mut Tr::Cursor, storage: &Tr::Storage) -> Self {
47        // We have to sort the literal constraints because cursor.seek_key can
48        // seek only forward.
49        literals.sort();
50        let mut container = Tr::KeyContainer::with_capacity(literals.len());
51        for constraint in literals {
52            container.push_own(constraint)
53        }
54        let range = 0..container.len();
55        let mut this = Self {
56            literals: container,
57            range,
58            current_index: None,
59        };
60        this.seek_next_literal_key(cursor, storage);
61        this
62    }
63
64    /// Returns the current literal, if any.
65    fn peek(&self) -> Option<Tr::Key<'_>> {
66        self.current_index
67            .and_then(|index| self.literals.get(index))
68    }
69
70    /// Returns `true` if there are no more literals to process.
71    fn is_exhausted(&self) -> bool {
72        self.current_index.is_none()
73    }
74
75    /// Seeks the cursor to the next key of a matching literal, if any.
76    fn seek_next_literal_key(&mut self, cursor: &mut Tr::Cursor, storage: &Tr::Storage) {
77        while let Some(index) = self.range.next() {
78            let literal = self.literals.get(index).expect("index out of bounds");
79            cursor.seek_key(storage, literal);
80            if cursor.get_key(storage).map_or(true, |key| key == literal) {
81                self.current_index = Some(index);
82                return;
83            }
84            // The cursor landed on a record that has a different key,
85            // meaning that there is no record whose key would match the
86            // current literal.
87        }
88        self.current_index = None;
89    }
90}
91
92/// An [Iterator] that extracts a peek result from a [TraceReader].
93///
94/// The iterator will apply a given `MapFilterProject` and obey literal
95/// constraints, if any.
96impl<Tr> PeekResultIterator<Tr>
97where
98    for<'a> Tr: TraceReader<
99            Key<'a>: ToDatumIter + Eq,
100            KeyOwn = Row,
101            Val<'a>: ToDatumIter,
102            TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
103            DiffGat<'a> = &'a Diff,
104        >,
105{
106    pub fn new(
107        target_id: GlobalId,
108        map_filter_project: mz_expr::SafeMfpPlan,
109        peek_timestamp: mz_repr::Timestamp,
110        literal_constraints: Option<&mut [Row]>,
111        trace_reader: &mut Tr,
112    ) -> Self {
113        let (mut cursor, storage) = trace_reader.cursor();
114        let literals = literal_constraints
115            .map(|constraints| Literals::new(constraints, &mut cursor, &storage));
116
117        Self {
118            target_id,
119            cursor,
120            storage,
121            map_filter_project,
122            peek_timestamp,
123            row_builder: Row::default(),
124            datum_vec: DatumVec::new(),
125            literals,
126        }
127    }
128
129    /// Returns `true` if the iterator has no more literals to process, or if there are no literals at all.
130    fn literals_exhausted(&self) -> bool {
131        self.literals.as_ref().map_or(false, Literals::is_exhausted)
132    }
133}
134
135impl<Tr> FusedIterator for PeekResultIterator<Tr> where
136    for<'a> Tr: TraceReader<
137            Key<'a>: ToDatumIter + Eq,
138            KeyOwn = Row,
139            Val<'a>: ToDatumIter,
140            TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
141            DiffGat<'a> = &'a Diff,
142        >
143{
144}
145
146impl<Tr> Iterator for PeekResultIterator<Tr>
147where
148    for<'a> Tr: TraceReader<
149            Key<'a>: ToDatumIter + Eq,
150            KeyOwn = Row,
151            Val<'a>: ToDatumIter,
152            TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
153            DiffGat<'a> = &'a Diff,
154        >,
155{
156    type Item = Result<(Row, NonZeroI64), String>;
157
158    fn next(&mut self) -> Option<Self::Item> {
159        let result = loop {
160            if self.literals_exhausted() {
161                return None;
162            }
163
164            if !self.cursor.key_valid(&self.storage) {
165                return None;
166            }
167
168            if !self.cursor.val_valid(&self.storage) {
169                let exhausted = self.step_key();
170                if exhausted {
171                    return None;
172                }
173            }
174
175            match self.extract_current_row() {
176                Ok(Some(row)) => break Ok(row),
177                Ok(None) => {
178                    // Have to keep stepping and try with the next val.
179                    self.cursor.step_val(&self.storage);
180                }
181                Err(err) => break Err(err),
182            }
183        };
184
185        self.cursor.step_val(&self.storage);
186
187        Some(result)
188    }
189}
190
191impl<Tr> PeekResultIterator<Tr>
192where
193    for<'a> Tr: TraceReader<
194            Key<'a>: ToDatumIter + Eq,
195            KeyOwn = Row,
196            Val<'a>: ToDatumIter,
197            TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
198            DiffGat<'a> = &'a Diff,
199        >,
200{
201    /// Extracts and returns the row currently pointed at by our cursor. Returns
202    /// `Ok(None)` if our MapFilterProject evaluates to `None`. Also returns any
203    /// errors that arise from evaluating the MapFilterProject.
204    fn extract_current_row(&mut self) -> Result<Option<(Row, NonZeroI64)>, String> {
205        // TODO: This arena could be maintained and reused for longer,
206        // but it wasn't clear at what interval we should flush
207        // it to ensure we don't accidentally spike our memory use.
208        // This choice is conservative, and not the end of the world
209        // from a performance perspective.
210        let arena = RowArena::new();
211
212        let key_item = self.cursor.key(&self.storage);
213        let key = key_item.to_datum_iter();
214        let row_item = self.cursor.val(&self.storage);
215        let row = row_item.to_datum_iter();
216
217        // An optional literal that we might have added to the borrow. Needs to be declared
218        // before the borrow to ensure correct drop order.
219        let maybe_literal;
220        let mut borrow = self.datum_vec.borrow();
221        borrow.extend(key);
222        borrow.extend(row);
223
224        if let Some(literals) = &mut self.literals
225            && let Some(literal) = literals.peek()
226        {
227            // The peek was created from an IndexedFilter join. We have to add those columns
228            // here that the join would add in a dataflow.
229            maybe_literal = literal;
230            borrow.extend(maybe_literal.to_datum_iter());
231        }
232        if let Some(result) = self
233            .map_filter_project
234            .evaluate_into(&mut borrow, &arena, &mut self.row_builder)
235            .map(|row| row.cloned())
236            .map_err_to_string_with_causes()?
237        {
238            let mut copies = Diff::ZERO;
239            self.cursor.map_times(&self.storage, |time, diff| {
240                if time.less_equal(&self.peek_timestamp) {
241                    copies += diff;
242                }
243            });
244            let copies: i64 = if copies.is_negative() {
245                let row = &*borrow;
246                tracing::error!(
247                    target = %self.target_id, diff = %copies, ?row,
248                    "index peek encountered negative multiplicities in ok trace",
249                );
250                return Err(format!(
251                    "Invalid data in source, \
252                             saw retractions ({}) for row that does not exist: {:?}",
253                    -copies, row,
254                ));
255            } else {
256                copies.into_inner()
257            };
258            // if copies > 0 ... otherwise skip
259            if let Some(copies) = NonZeroI64::new(copies) {
260                Ok(Some((result, copies)))
261            } else {
262                Ok(None)
263            }
264        } else {
265            Ok(None)
266        }
267    }
268
269    /// Steps the key forward, respecting literal constraints.
270    ///
271    /// Returns `true` if we are exhausted.
272    fn step_key(&mut self) -> bool {
273        assert!(
274            !self.cursor.val_valid(&self.storage),
275            "must only step key when the vals for a key are exhausted"
276        );
277
278        if let Some(literals) = &mut self.literals {
279            literals.seek_next_literal_key(&mut self.cursor, &self.storage);
280
281            if literals.is_exhausted() {
282                return true;
283            }
284        } else {
285            self.cursor.step_key(&self.storage);
286        }
287
288        if !self.cursor.key_valid(&self.storage) {
289            // We're exhausted!
290            return true;
291        }
292
293        assert!(
294            self.cursor.val_valid(&self.storage),
295            "there must always be at least one val per key"
296        );
297
298        false
299    }
300}