mz_compute/compute_state/
peek_stash.rs1use std::num::{NonZeroI64, NonZeroU64};
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use mz_compute_client::protocol::command::Peek;
14use mz_compute_client::protocol::response::{PeekResponse, StashedPeekResponse};
15use mz_expr::row::RowCollection;
16use mz_ore::cast::CastFrom;
17use mz_ore::task::AbortOnDropHandle;
18use mz_persist_client::Schemas;
19use mz_persist_client::cache::PersistClientCache;
20use mz_persist_types::codec_impls::UnitSchema;
21use mz_persist_types::{PersistLocation, ShardId};
22use mz_repr::{Diff, RelationDesc, Row, Timestamp};
23use mz_storage_types::sources::SourceData;
24use timely::progress::Antichain;
25use tokio::sync::oneshot;
26use tracing::debug;
27use uuid::Uuid;
28
29use crate::arrangement::manager::{PaddedTrace, TraceBundle};
30use crate::compute_state::peek_result_iterator;
31use crate::compute_state::peek_result_iterator::PeekResultIterator;
32use crate::typedefs::RowRowAgent;
33
34pub struct StashingPeek {
41 pub peek: Peek,
42 peek_iterator: Option<PeekResultIterator<PaddedTrace<RowRowAgent<Timestamp, Diff>>>>,
45 rows_tx: Option<tokio::sync::mpsc::Sender<Result<Vec<(Row, NonZeroI64)>, String>>>,
49 pub result: oneshot::Receiver<(PeekResponse, Duration)>,
51 pub span: tracing::Span,
53 _abort_handle: AbortOnDropHandle<()>,
56}
57
58impl StashingPeek {
59 pub fn start_upload(
60 persist_clients: Arc<PersistClientCache>,
61 persist_location: &PersistLocation,
62 mut peek: Peek,
63 mut trace_bundle: TraceBundle,
64 batch_max_runs: usize,
65 ) -> Self {
66 let (rows_tx, rows_rx) = tokio::sync::mpsc::channel(10);
67 let (result_tx, result_rx) = oneshot::channel::<(PeekResponse, Duration)>();
68
69 let persist_clients = Arc::clone(&persist_clients);
70 let persist_location = persist_location.clone();
71
72 let peek_uuid = peek.uuid;
73 let relation_desc = peek.result_desc.clone();
74
75 let oks_handle = trace_bundle.oks_mut();
76
77 let peek_iterator = peek_result_iterator::PeekResultIterator::new(
78 peek.target.id(),
79 peek.map_filter_project.clone(),
80 peek.timestamp,
81 peek.literal_constraints.as_deref_mut(),
82 oks_handle,
83 );
84
85 let rows_needed_by_finishing = peek.finishing.num_rows_needed();
86
87 let task_handle = mz_ore::task::spawn(
88 || format!("peek_stash::stash_peek_response({peek_uuid})"),
89 async move {
90 let start = Instant::now();
91
92 let result = Self::do_upload(
93 &persist_clients,
94 persist_location,
95 batch_max_runs,
96 peek.uuid,
97 relation_desc,
98 rows_needed_by_finishing,
99 rows_rx,
100 )
101 .await;
102
103 let result = match result {
104 Ok(peek_response) => peek_response,
105 Err(e) => PeekResponse::Error(e.to_string()),
106 };
107 match result_tx.send((result, start.elapsed())) {
108 Ok(()) => {}
109 Err((_result, elapsed)) => {
110 debug!(duration = ?elapsed, "dropping result for cancelled peek {}", peek_uuid)
111 }
112 }
113 },
114 );
115
116 Self {
117 peek,
118 peek_iterator: Some(peek_iterator),
119 rows_tx: Some(rows_tx),
120 result: result_rx,
121 span: tracing::Span::current(),
122 _abort_handle: task_handle.abort_on_drop(),
123 }
124 }
125
126 async fn do_upload(
127 persist_clients: &PersistClientCache,
128 persist_location: PersistLocation,
129 batch_max_runs: usize,
130 peek_uuid: Uuid,
131 relation_desc: RelationDesc,
132 max_rows: Option<usize>, mut rows_rx: tokio::sync::mpsc::Receiver<Result<Vec<(Row, NonZeroI64)>, String>>,
134 ) -> Result<PeekResponse, String> {
135 let client = persist_clients
136 .open(persist_location)
137 .await
138 .map_err(|e| e.to_string())?;
139
140 let shard_id = format!("s{}", peek_uuid);
141 let shard_id = ShardId::try_from(shard_id).expect("can parse");
142 let write_schemas: Schemas<SourceData, ()> = Schemas {
143 id: None,
144 key: Arc::new(relation_desc.clone()),
145 val: Arc::new(UnitSchema),
146 };
147
148 let result_ts = Timestamp::default();
149 let lower = Antichain::from_elem(result_ts);
150 let upper = Antichain::from_elem(result_ts.step_forward());
151
152 let mut batch_builder = client
159 .batch_builder::<SourceData, (), Timestamp, i64>(
160 shard_id,
161 write_schemas,
162 lower,
163 Some(batch_max_runs),
164 )
165 .await;
166
167 let mut num_rows: u64 = 0;
168
169 'outer: loop {
170 let row = rows_rx.recv().await;
171 match row {
172 Some(Ok(rows)) => {
173 for (row, diff) in rows {
174 num_rows +=
175 u64::from(NonZeroU64::try_from(diff).expect("diff fits into u64"));
176 let diff: i64 = diff.into();
177
178 batch_builder
179 .add(&SourceData(Ok(row)), &(), &Timestamp::default(), &diff)
180 .await
181 .expect("invalid usage");
182
183 if let Some(max_rows) = max_rows {
184 if num_rows >= u64::cast_from(max_rows) {
185 drop(rows_rx);
188 break 'outer;
189 }
190 }
191 }
192 }
193 Some(Err(err)) => return Ok(PeekResponse::Error(err)),
194 None => {
195 break;
196 }
197 }
198 }
199
200 let batch = batch_builder.finish(upper).await.expect("invalid usage");
201
202 let stashed_response = StashedPeekResponse {
203 num_rows_batches: u64::cast_from(num_rows),
204 encoded_size_bytes: batch.encoded_size_bytes(),
205 relation_desc,
206 shard_id,
207 batches: vec![batch.into_transmittable_batch()],
208 inline_rows: vec![RowCollection::new(vec![], &[])],
209 };
210 let result = PeekResponse::Stashed(Box::new(stashed_response));
211 Ok(result)
212 }
213
214 pub fn pump_rows(&mut self, mut num_batches: usize, batch_size: usize) {
218 while num_batches > 0
219 && let Some(row_iter) = self.peek_iterator.as_mut()
220 {
221 let permit = match self
224 .rows_tx
225 .as_mut()
226 .expect("missing rows_tx")
227 .try_reserve()
228 {
229 Ok(permit) => permit,
230 Err(_) => {
231 break;
233 }
234 };
235
236 let rows: Result<Vec<_>, _> = row_iter.take(batch_size).collect();
237 match rows {
238 Ok(rows) if rows.is_empty() => {
239 drop(permit);
241 self.peek_iterator.take();
242 self.rows_tx.take();
243 break;
244 }
245 Ok(rows) => {
246 permit.send(Ok(rows));
247 }
248 Err(e) => {
249 permit.send(Err(e));
250 }
251 }
252
253 num_batches -= 1;
254 }
255 }
256}