mz_compute/compute_state/
peek_stash.rs1use std::num::{NonZeroI64, NonZeroU64};
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use mz_compute_client::protocol::command::Peek;
14use mz_compute_client::protocol::response::{PeekResponse, StashedPeekResponse};
15use mz_expr::row::RowCollection;
16use mz_ore::cast::CastFrom;
17use mz_ore::task::AbortOnDropHandle;
18use mz_persist_client::Schemas;
19use mz_persist_client::cache::PersistClientCache;
20use mz_persist_types::codec_impls::UnitSchema;
21use mz_persist_types::{PersistLocation, ShardId};
22use mz_repr::{Diff, RelationDesc, Row, Timestamp};
23use mz_storage_types::sources::SourceData;
24use timely::progress::Antichain;
25use tokio::sync::oneshot;
26use tracing::debug;
27use uuid::Uuid;
28
29use crate::arrangement::manager::{PaddedTrace, TraceBundle};
30use crate::compute_state::peek_result_iterator;
31use crate::compute_state::peek_result_iterator::PeekResultIterator;
32use crate::typedefs::RowRowAgent;
33
34pub struct StashingPeek {
41 pub peek: Peek,
42 peek_iterator: Option<PeekResultIterator<PaddedTrace<RowRowAgent<Timestamp, Diff>>>>,
45 rows_tx: Option<tokio::sync::mpsc::Sender<Result<Vec<(Row, NonZeroI64)>, String>>>,
49 pub result: oneshot::Receiver<(PeekResponse, Duration)>,
51 pub span: tracing::Span,
53 _abort_handle: AbortOnDropHandle<()>,
56}
57
58impl StashingPeek {
59 pub fn start_upload(
60 persist_clients: Arc<PersistClientCache>,
61 persist_location: &PersistLocation,
62 mut peek: Peek,
63 mut trace_bundle: TraceBundle,
64 batch_max_runs: usize,
65 ) -> Self {
66 let (rows_tx, rows_rx) = tokio::sync::mpsc::channel(10);
67 let (result_tx, result_rx) = oneshot::channel::<(PeekResponse, Duration)>();
68
69 let persist_clients = Arc::clone(&persist_clients);
70 let persist_location = persist_location.clone();
71
72 let peek_uuid = peek.uuid;
73 let relation_desc = peek.result_desc.clone();
74
75 let oks_handle = trace_bundle.oks_mut();
76
77 let peek_iterator = peek_result_iterator::PeekResultIterator::new(
78 peek.target.id(),
79 peek.map_filter_project.clone(),
80 peek.timestamp,
81 peek.literal_constraints.as_deref_mut(),
82 oks_handle,
83 );
84
85 let rows_needed_by_finishing = peek.finishing.num_rows_needed();
86
87 let task_handle = mz_ore::task::spawn(
88 || format!("peek_stash::stash_peek_response({peek_uuid})"),
89 async move {
90 let start = Instant::now();
91
92 let result = Self::do_upload(
93 &persist_clients,
94 persist_location,
95 batch_max_runs,
96 peek.uuid,
97 relation_desc,
98 rows_needed_by_finishing,
99 rows_rx,
100 )
101 .await;
102
103 let result = match result {
104 Ok(peek_response) => peek_response,
105 Err(e) => PeekResponse::Error(e.to_string()),
106 };
107 match result_tx.send((result, start.elapsed())) {
108 Ok(()) => {}
109 Err((_result, elapsed)) => {
110 debug!(duration = ?elapsed, "dropping result for cancelled peek {}", peek_uuid)
111 }
112 }
113 },
114 );
115
116 Self {
117 peek,
118 peek_iterator: Some(peek_iterator),
119 rows_tx: Some(rows_tx),
120 result: result_rx,
121 span: tracing::Span::current(),
122 _abort_handle: task_handle.abort_on_drop(),
123 }
124 }
125
126 async fn do_upload(
127 persist_clients: &PersistClientCache,
128 persist_location: PersistLocation,
129 batch_max_runs: usize,
130 peek_uuid: Uuid,
131 relation_desc: RelationDesc,
132 max_rows: Option<usize>, mut rows_rx: tokio::sync::mpsc::Receiver<Result<Vec<(Row, NonZeroI64)>, String>>,
134 ) -> Result<PeekResponse, String> {
135 let client = persist_clients
136 .open(persist_location)
137 .await
138 .map_err(|e| e.to_string())?;
139
140 let shard_id = format!("s{}", peek_uuid);
141 let shard_id = ShardId::try_from(shard_id).expect("can parse");
142 let write_schemas: Schemas<SourceData, ()> = Schemas {
143 id: None,
144 key: Arc::new(relation_desc.clone()),
145 val: Arc::new(UnitSchema),
146 };
147
148 let result_ts = Timestamp::default();
149 let lower = Antichain::from_elem(result_ts);
150 let upper = Antichain::from_elem(result_ts.step_forward());
151
152 let mut batch_builder = client
159 .batch_builder::<SourceData, (), Timestamp, i64>(
160 shard_id,
161 write_schemas,
162 lower,
163 Some(batch_max_runs),
164 )
165 .await;
166
167 let mut num_rows: u64 = 0;
168
169 loop {
170 let row = rows_rx.recv().await;
171 match row {
172 Some(Ok(rows)) => {
173 for (row, diff) in rows {
174 num_rows +=
175 u64::from(NonZeroU64::try_from(diff).expect("diff fits into u64"));
176 let diff: i64 = diff.into();
177
178 batch_builder
179 .add(&SourceData(Ok(row)), &(), &Timestamp::default(), &diff)
180 .await
181 .expect("invalid usage");
182
183 if let Some(max_rows) = max_rows {
185 if num_rows >= u64::cast_from(max_rows) {
186 break;
187 }
188 }
189 }
190 }
191 Some(Err(err)) => return Ok(PeekResponse::Error(err)),
192 None => {
193 break;
194 }
195 }
196 }
197
198 let batch = batch_builder.finish(upper).await.expect("invalid usage");
199
200 let stashed_response = StashedPeekResponse {
201 num_rows_batches: u64::cast_from(num_rows),
202 encoded_size_bytes: batch.encoded_size_bytes(),
203 relation_desc,
204 shard_id,
205 batches: vec![batch.into_transmittable_batch()],
206 inline_rows: RowCollection::new(vec![], &[]),
207 };
208 let result = PeekResponse::Stashed(Box::new(stashed_response));
209 Ok(result)
210 }
211
212 pub fn pump_rows(&mut self, mut num_batches: usize, batch_size: usize) {
216 while let Some(row_iter) = self.peek_iterator.as_mut() {
217 let permit = match self
220 .rows_tx
221 .as_mut()
222 .expect("missing rows_tx")
223 .try_reserve()
224 {
225 Ok(permit) => permit,
226 Err(_) => {
227 break;
229 }
230 };
231
232 let rows: Result<Vec<_>, _> = row_iter.take(batch_size).collect();
233 match rows {
234 Ok(rows) if rows.is_empty() => {
235 drop(permit);
237 self.peek_iterator.take();
238 self.rows_tx.take();
239 break;
240 }
241 Ok(rows) => {
242 permit.send(Ok(rows));
243 }
244 Err(e) => {
245 permit.send(Err(e));
246 }
247 }
248
249 num_batches -= 1;
250 if num_batches == 0 {
251 break;
252 }
253 }
254 }
255}