mz_compute/compute_state/
peek_stash.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! For eligible peeks, we send the result back via the peek stash (aka persist
7//! blob), instead of inline in `ComputeResponse`.
8
9use std::num::{NonZeroI64, NonZeroU64};
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use mz_compute_client::protocol::command::Peek;
14use mz_compute_client::protocol::response::{PeekResponse, StashedPeekResponse};
15use mz_expr::row::RowCollection;
16use mz_ore::cast::CastFrom;
17use mz_ore::task::AbortOnDropHandle;
18use mz_persist_client::Schemas;
19use mz_persist_client::cache::PersistClientCache;
20use mz_persist_types::codec_impls::UnitSchema;
21use mz_persist_types::{PersistLocation, ShardId};
22use mz_repr::{Diff, RelationDesc, Row, Timestamp};
23use mz_storage_types::sources::SourceData;
24use timely::progress::Antichain;
25use tokio::sync::oneshot;
26use tracing::debug;
27use uuid::Uuid;
28
29use crate::arrangement::manager::{PaddedTrace, TraceBundle};
30use crate::compute_state::peek_result_iterator;
31use crate::compute_state::peek_result_iterator::PeekResultIterator;
32use crate::typedefs::RowRowAgent;
33
34/// An async task that stashes a peek response in persist and yields a handle to
35/// the batch in a [PeekResponse::Stashed].
36///
37/// Note that `StashingPeek` intentionally does not implement or derive
38/// `Clone`, as each `StashingPeek` is meant to be dropped after it's
39/// done or no longer needed.
40pub struct StashingPeek {
41    pub peek: Peek,
42    /// Iterator for the results. The worker thread has to continually pump
43    /// results from this to the `rows_tx` channel.
44    peek_iterator: Option<PeekResultIterator<PaddedTrace<RowRowAgent<Timestamp, Diff>>>>,
45    /// We can't give a PeekResultIterator to our async upload task because the
46    /// underlying trace reader is not Send/Sync. So we need to use a channel to
47    /// send result rows from the worker thread to the async background task.
48    rows_tx: Option<tokio::sync::mpsc::Sender<Result<Vec<(Row, NonZeroI64)>, String>>>,
49    /// The result of the background task, eventually.
50    pub result: oneshot::Receiver<(PeekResponse, Duration)>,
51    /// The `tracing::Span` tracking this peek's operation
52    pub span: tracing::Span,
53    /// A background task that's responsible for producing the peek results.
54    /// If we're no longer interested in the results, we abort the task.
55    _abort_handle: AbortOnDropHandle<()>,
56}
57
58impl StashingPeek {
59    pub fn start_upload(
60        persist_clients: Arc<PersistClientCache>,
61        persist_location: &PersistLocation,
62        mut peek: Peek,
63        mut trace_bundle: TraceBundle,
64        batch_max_runs: usize,
65    ) -> Self {
66        let (rows_tx, rows_rx) = tokio::sync::mpsc::channel(10);
67        let (result_tx, result_rx) = oneshot::channel::<(PeekResponse, Duration)>();
68
69        let persist_clients = Arc::clone(&persist_clients);
70        let persist_location = persist_location.clone();
71
72        let peek_uuid = peek.uuid;
73        let relation_desc = peek.result_desc.clone();
74
75        let oks_handle = trace_bundle.oks_mut();
76
77        let peek_iterator = peek_result_iterator::PeekResultIterator::new(
78            peek.target.id(),
79            peek.map_filter_project.clone(),
80            peek.timestamp,
81            peek.literal_constraints.as_deref_mut(),
82            oks_handle,
83        );
84
85        let rows_needed_by_finishing = peek.finishing.num_rows_needed();
86
87        let task_handle = mz_ore::task::spawn(
88            || format!("peek_stash::stash_peek_response({peek_uuid})"),
89            async move {
90                let start = Instant::now();
91
92                let result = Self::do_upload(
93                    &persist_clients,
94                    persist_location,
95                    batch_max_runs,
96                    peek.uuid,
97                    relation_desc,
98                    rows_needed_by_finishing,
99                    rows_rx,
100                )
101                .await;
102
103                let result = match result {
104                    Ok(peek_response) => peek_response,
105                    Err(e) => PeekResponse::Error(e.to_string()),
106                };
107                match result_tx.send((result, start.elapsed())) {
108                    Ok(()) => {}
109                    Err((_result, elapsed)) => {
110                        debug!(duration = ?elapsed, "dropping result for cancelled peek {}", peek_uuid)
111                    }
112                }
113            },
114        );
115
116        Self {
117            peek,
118            peek_iterator: Some(peek_iterator),
119            rows_tx: Some(rows_tx),
120            result: result_rx,
121            span: tracing::Span::current(),
122            _abort_handle: task_handle.abort_on_drop(),
123        }
124    }
125
126    async fn do_upload(
127        persist_clients: &PersistClientCache,
128        persist_location: PersistLocation,
129        batch_max_runs: usize,
130        peek_uuid: Uuid,
131        relation_desc: RelationDesc,
132        max_rows: Option<usize>, // The number of rows needed by the RowSetFinishing's offset + limit
133        mut rows_rx: tokio::sync::mpsc::Receiver<Result<Vec<(Row, NonZeroI64)>, String>>,
134    ) -> Result<PeekResponse, String> {
135        let client = persist_clients
136            .open(persist_location)
137            .await
138            .map_err(|e| e.to_string())?;
139
140        let shard_id = format!("s{}", peek_uuid);
141        let shard_id = ShardId::try_from(shard_id).expect("can parse");
142        let write_schemas: Schemas<SourceData, ()> = Schemas {
143            id: None,
144            key: Arc::new(relation_desc.clone()),
145            val: Arc::new(UnitSchema),
146        };
147
148        let result_ts = Timestamp::default();
149        let lower = Antichain::from_elem(result_ts);
150        let upper = Antichain::from_elem(result_ts.step_forward());
151
152        // We have to use SourceData, which is a wrapper around a Result<Row,
153        // DataflowError>, because the bare columnar Row encoder doesn't support
154        // encoding rows with zero columns.
155        //
156        // TODO: We _could_ work around the above by teaching the bare columnar
157        // Row encoder about zero-column rows.
158        let mut batch_builder = client
159            .batch_builder::<SourceData, (), Timestamp, i64>(
160                shard_id,
161                write_schemas,
162                lower,
163                Some(batch_max_runs),
164            )
165            .await;
166
167        let mut num_rows: u64 = 0;
168
169        loop {
170            let row = rows_rx.recv().await;
171            match row {
172                Some(Ok(rows)) => {
173                    for (row, diff) in rows {
174                        num_rows +=
175                            u64::from(NonZeroU64::try_from(diff).expect("diff fits into u64"));
176                        let diff: i64 = diff.into();
177
178                        batch_builder
179                            .add(&SourceData(Ok(row)), &(), &Timestamp::default(), &diff)
180                            .await
181                            .expect("invalid usage");
182
183                        // Stop if we have enough rows to satisfy the RowSetFinishing's offset + limit.
184                        if let Some(max_rows) = max_rows {
185                            if num_rows >= u64::cast_from(max_rows) {
186                                break;
187                            }
188                        }
189                    }
190                }
191                Some(Err(err)) => return Ok(PeekResponse::Error(err)),
192                None => {
193                    break;
194                }
195            }
196        }
197
198        let batch = batch_builder.finish(upper).await.expect("invalid usage");
199
200        let stashed_response = StashedPeekResponse {
201            num_rows_batches: u64::cast_from(num_rows),
202            encoded_size_bytes: batch.encoded_size_bytes(),
203            relation_desc,
204            shard_id,
205            batches: vec![batch.into_transmittable_batch()],
206            inline_rows: RowCollection::new(vec![], &[]),
207        };
208        let result = PeekResponse::Stashed(Box::new(stashed_response));
209        Ok(result)
210    }
211
212    /// Pumps rows from the [PeekResultIterator] to the async task, via our
213    /// `rows_tx`. Will pump at most `batch_size` rows in one batch, and at most
214    /// the given `num_batches` batches.
215    pub fn pump_rows(&mut self, mut num_batches: usize, batch_size: usize) {
216        while let Some(row_iter) = self.peek_iterator.as_mut() {
217            // Try to reserve space in the channel before pulling rows from the
218            // iterator.
219            let permit = match self
220                .rows_tx
221                .as_mut()
222                .expect("missing rows_tx")
223                .try_reserve()
224            {
225                Ok(permit) => permit,
226                Err(_) => {
227                    // Channel is full, can't send more rows right now.
228                    break;
229                }
230            };
231
232            let rows: Result<Vec<_>, _> = row_iter.take(batch_size).collect();
233            match rows {
234                Ok(rows) if rows.is_empty() => {
235                    // Iterator is exhausted, we're done
236                    drop(permit);
237                    self.peek_iterator.take();
238                    self.rows_tx.take();
239                    break;
240                }
241                Ok(rows) => {
242                    permit.send(Ok(rows));
243                }
244                Err(e) => {
245                    permit.send(Err(e));
246                }
247            }
248
249            num_batches -= 1;
250            if num_batches == 0 {
251                break;
252            }
253        }
254    }
255}