mz_compute_client/
service.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Compute layer client and server.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::mem;
14
15use async_trait::async_trait;
16use bytesize::ByteSize;
17use differential_dataflow::consolidation::consolidate_updates;
18use differential_dataflow::lattice::Lattice;
19use mz_expr::row::RowCollection;
20use mz_ore::cast::CastInto;
21use mz_ore::soft_panic_or_log;
22use mz_ore::tracing::OpenTelemetryContext;
23use mz_repr::{Diff, GlobalId, Row};
24use mz_service::client::{GenericClient, Partitionable, PartitionedState};
25use timely::PartialOrder;
26use timely::progress::frontier::{Antichain, MutableAntichain};
27use uuid::Uuid;
28
29use crate::controller::ComputeControllerTimestamp;
30use crate::protocol::command::ComputeCommand;
31use crate::protocol::response::{
32    ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, StashedPeekResponse,
33    SubscribeBatch, SubscribeResponse,
34};
35
36/// A client to a compute server.
37pub trait ComputeClient<T = mz_repr::Timestamp>:
38    GenericClient<ComputeCommand<T>, ComputeResponse<T>>
39{
40}
41
42impl<C, T> ComputeClient<T> for C where C: GenericClient<ComputeCommand<T>, ComputeResponse<T>> {}
43
44#[async_trait]
45impl<T: Send> GenericClient<ComputeCommand<T>, ComputeResponse<T>> for Box<dyn ComputeClient<T>> {
46    async fn send(&mut self, cmd: ComputeCommand<T>) -> Result<(), anyhow::Error> {
47        (**self).send(cmd).await
48    }
49
50    /// # Cancel safety
51    ///
52    /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
53    /// statement and some other branch completes first, it is guaranteed that no messages were
54    /// received by this client.
55    async fn recv(&mut self) -> Result<Option<ComputeResponse<T>>, anyhow::Error> {
56        // `GenericClient::recv` is required to be cancel safe.
57        (**self).recv().await
58    }
59}
60
61/// Maintained state for partitioned compute clients.
62///
63/// This helper type unifies the responses of multiple partitioned workers in order to present as a
64/// single worker:
65///
66///   * It emits `Frontiers` responses reporting the minimum/meet of frontiers reported by the
67///     individual workers.
68///   * It emits `PeekResponse`s and `SubscribeResponse`s reporting the union of the responses
69///     received from the workers.
70///
71/// In the compute communication stack, this client is instantiated several times:
72///
73///   * One instance on the controller side, dispatching between cluster processes.
74///   * One instance in each cluster process, dispatching between timely worker threads.
75///
76/// Note that because compute commands, except `Hello` and `UpdateConfiguration`, are only
77/// sent to the first process, the cluster-side instances of `PartitionedComputeState` are not
78/// guaranteed to see all compute commands. Or more specifically: The instance running inside
79/// process 0 sees all commands, whereas the instances running inside the other processes only see
80/// `Hello` and `UpdateConfiguration`. The `PartitionedComputeState` implementation must be
81/// able to cope with this limited visibility. It does so by performing most of its state management
82/// based on observed compute responses rather than commands.
83#[derive(Debug)]
84pub struct PartitionedComputeState<T> {
85    /// Number of partitions the state machine represents.
86    parts: usize,
87    /// The maximum result size this state machine can return.
88    ///
89    /// This is updated upon receiving [`ComputeCommand::UpdateConfiguration`]s.
90    max_result_size: u64,
91    /// Tracked frontiers for indexes and sinks.
92    ///
93    /// Frontier tracking for a collection is initialized when the first `Frontiers` response
94    /// for that collection is received. Frontier tracking is ceased when all shards have reported
95    /// advancement to the empty frontier for all frontier kinds.
96    ///
97    /// The compute protocol requires that shards always emit `Frontiers` responses reporting empty
98    /// frontiers for all frontier kinds when a collection is dropped. It further requires that no
99    /// further `Frontier` responses are emitted for a collection after the empty frontiers were
100    /// reported. These properties ensure that a) we always cease frontier tracking for collections
101    /// that have been dropped and b) frontier tracking for a collection is not re-initialized
102    /// after it was ceased.
103    frontiers: BTreeMap<GlobalId, TrackedFrontiers<T>>,
104    /// For each in-progress peek the response data received so far, and the set of shards that
105    /// provided responses already.
106    ///
107    /// Tracking of responses for a peek is initialized when the first `PeekResponse` for that peek
108    /// is received. Once all shards have provided a `PeekResponse`, a unified peek response is
109    /// emitted and the peek tracking state is dropped again.
110    ///
111    /// The compute protocol requires that exactly one response is emitted for each peek. This
112    /// property ensures that a) we can eventually drop the tracking state maintained for a peek
113    /// and b) we won't re-initialize tracking for a peek we have already served.
114    peek_responses: BTreeMap<Uuid, (PeekResponse, BTreeSet<usize>)>,
115    /// For each in-progress copy-to the response data received so far, and the set of shards that
116    /// provided responses already.
117    ///
118    /// Tracking of responses for a COPY TO is initialized when the first `CopyResponse` for that command
119    /// is received. Once all shards have provided a `CopyResponse`, a unified copy response is
120    /// emitted and the copy_to tracking state is dropped again.
121    ///
122    /// The compute protocol requires that exactly one response is emitted for each COPY TO command. This
123    /// property ensures that a) we can eventually drop the tracking state maintained for a copy
124    /// and b) we won't re-initialize tracking for a copy we have already served.
125    copy_to_responses: BTreeMap<GlobalId, (CopyToResponse, BTreeSet<usize>)>,
126    /// Tracks in-progress `SUBSCRIBE`s, and the stashed rows we are holding back until their
127    /// timestamps are complete.
128    ///
129    /// The updates may be `Err` if any of the batches have reported an error, in which case the
130    /// subscribe is permanently borked.
131    ///
132    /// Tracking of a subscribe is initialized when the first `SubscribeResponse` for that
133    /// subscribe is received. Once all shards have emitted an "end-of-subscribe" response the
134    /// subscribe tracking state is dropped again.
135    ///
136    /// The compute protocol requires that for a subscribe that shuts down an end-of-subscribe
137    /// response is emitted:
138    ///
139    ///   * Either a `Batch` response reporting advancement to the empty frontier...
140    ///   * ... or a `DroppedAt` response reporting that the subscribe was dropped before
141    ///     completing.
142    ///
143    /// The compute protocol further requires that no further `SubscribeResponse`s are emitted for
144    /// a subscribe after an end-of-subscribe was reported.
145    ///
146    /// These two properties ensure that a) once a subscribe has shut down, we can eventually drop
147    /// the tracking state maintained for it and b) we won't re-initialize tracking for a subscribe
148    /// we have already dropped.
149    pending_subscribes: BTreeMap<GlobalId, PendingSubscribe<T>>,
150}
151
152impl<T> Partitionable<ComputeCommand<T>, ComputeResponse<T>>
153    for (ComputeCommand<T>, ComputeResponse<T>)
154where
155    T: ComputeControllerTimestamp,
156{
157    type PartitionedState = PartitionedComputeState<T>;
158
159    fn new(parts: usize) -> PartitionedComputeState<T> {
160        PartitionedComputeState {
161            parts,
162            max_result_size: u64::MAX,
163            frontiers: BTreeMap::new(),
164            peek_responses: BTreeMap::new(),
165            pending_subscribes: BTreeMap::new(),
166            copy_to_responses: BTreeMap::new(),
167        }
168    }
169}
170
171impl<T> PartitionedComputeState<T>
172where
173    T: ComputeControllerTimestamp,
174{
175    /// Observes commands that move past.
176    pub fn observe_command(&mut self, command: &ComputeCommand<T>) {
177        match command {
178            ComputeCommand::UpdateConfiguration(config) => {
179                if let Some(max_result_size) = config.max_result_size {
180                    self.max_result_size = max_result_size;
181                }
182            }
183            _ => {
184                // We are not guaranteed to observe other compute commands. We
185                // must therefore not add any logic here that relies on doing so.
186            }
187        }
188    }
189
190    /// Absorb a [`ComputeResponse::Frontiers`].
191    fn absorb_frontiers(
192        &mut self,
193        shard_id: usize,
194        collection_id: GlobalId,
195        frontiers: FrontiersResponse<T>,
196    ) -> Option<ComputeResponse<T>> {
197        let tracked = self
198            .frontiers
199            .entry(collection_id)
200            .or_insert_with(|| TrackedFrontiers::new(self.parts));
201
202        let write_frontier = frontiers
203            .write_frontier
204            .and_then(|f| tracked.update_write_frontier(shard_id, &f));
205        let input_frontier = frontiers
206            .input_frontier
207            .and_then(|f| tracked.update_input_frontier(shard_id, &f));
208        let output_frontier = frontiers
209            .output_frontier
210            .and_then(|f| tracked.update_output_frontier(shard_id, &f));
211
212        let frontiers = FrontiersResponse {
213            write_frontier,
214            input_frontier,
215            output_frontier,
216        };
217        let result = frontiers
218            .has_updates()
219            .then_some(ComputeResponse::Frontiers(collection_id, frontiers));
220
221        if tracked.all_empty() {
222            // All shards have reported advancement to the empty frontier, so we do not
223            // expect further updates for this collection.
224            self.frontiers.remove(&collection_id);
225        }
226
227        result
228    }
229
230    /// Absorb a [`ComputeResponse::PeekResponse`].
231    fn absorb_peek_response(
232        &mut self,
233        shard_id: usize,
234        uuid: Uuid,
235        response: PeekResponse,
236        otel_ctx: OpenTelemetryContext,
237    ) -> Option<ComputeResponse<T>> {
238        let (merged, ready_shards) = self.peek_responses.entry(uuid).or_insert((
239            PeekResponse::Rows(RowCollection::default()),
240            BTreeSet::new(),
241        ));
242
243        let first = ready_shards.insert(shard_id);
244        assert!(first, "duplicate peek response");
245
246        let resp1 = mem::replace(merged, PeekResponse::Canceled);
247        *merged = merge_peek_responses(resp1, response, self.max_result_size);
248
249        if ready_shards.len() == self.parts {
250            let (response, _) = self.peek_responses.remove(&uuid).unwrap();
251            Some(ComputeResponse::PeekResponse(uuid, response, otel_ctx))
252        } else {
253            None
254        }
255    }
256
257    /// Absorb a [`ComputeResponse::CopyToResponse`].
258    fn absorb_copy_to_response(
259        &mut self,
260        shard_id: usize,
261        copyto_id: GlobalId,
262        response: CopyToResponse,
263    ) -> Option<ComputeResponse<T>> {
264        use CopyToResponse::*;
265
266        let (merged, ready_shards) = self
267            .copy_to_responses
268            .entry(copyto_id)
269            .or_insert((CopyToResponse::RowCount(0), BTreeSet::new()));
270
271        let first = ready_shards.insert(shard_id);
272        assert!(first, "duplicate copy-to response");
273
274        let resp1 = mem::replace(merged, Dropped);
275        *merged = match (resp1, response) {
276            (Dropped, _) | (_, Dropped) => Dropped,
277            (Error(e), _) | (_, Error(e)) => Error(e),
278            (RowCount(r1), RowCount(r2)) => RowCount(r1 + r2),
279        };
280
281        if ready_shards.len() == self.parts {
282            let (response, _) = self.copy_to_responses.remove(&copyto_id).unwrap();
283            Some(ComputeResponse::CopyToResponse(copyto_id, response))
284        } else {
285            None
286        }
287    }
288
289    /// Absorb a [`ComputeResponse::SubscribeResponse`].
290    fn absorb_subscribe_response(
291        &mut self,
292        subscribe_id: GlobalId,
293        response: SubscribeResponse<T>,
294    ) -> Option<ComputeResponse<T>> {
295        let tracked = self
296            .pending_subscribes
297            .entry(subscribe_id)
298            .or_insert_with(|| PendingSubscribe::new(self.parts));
299
300        let emit_response = match response {
301            SubscribeResponse::Batch(batch) => {
302                let frontiers = &mut tracked.frontiers;
303                let old_frontier = frontiers.frontier().to_owned();
304                frontiers.update_iter(batch.lower.into_iter().map(|t| (t, -1)));
305                frontiers.update_iter(batch.upper.into_iter().map(|t| (t, 1)));
306                let new_frontier = frontiers.frontier().to_owned();
307
308                tracked.stash(batch.updates, self.max_result_size);
309
310                // If the frontier has advanced, it is time to announce subscribe progress. Unless
311                // we have already announced that the subscribe has been dropped, in which case we
312                // must keep quiet.
313                if old_frontier != new_frontier && !tracked.dropped {
314                    let updates = match &mut tracked.stashed_updates {
315                        Ok(stashed_updates) => {
316                            // The compute protocol requires us to only send out consolidated
317                            // batches.
318                            consolidate_updates(stashed_updates);
319
320                            let mut ship = Vec::new();
321                            let mut keep = Vec::new();
322                            for (time, data, diff) in stashed_updates.drain(..) {
323                                if new_frontier.less_equal(&time) {
324                                    keep.push((time, data, diff));
325                                } else {
326                                    ship.push((time, data, diff));
327                                }
328                            }
329                            tracked.stashed_updates = Ok(keep);
330                            Ok(ship)
331                        }
332                        Err(text) => Err(text.clone()),
333                    };
334                    Some(ComputeResponse::SubscribeResponse(
335                        subscribe_id,
336                        SubscribeResponse::Batch(SubscribeBatch {
337                            lower: old_frontier,
338                            upper: new_frontier,
339                            updates,
340                        }),
341                    ))
342                } else {
343                    None
344                }
345            }
346            SubscribeResponse::DroppedAt(frontier) => {
347                tracked
348                    .frontiers
349                    .update_iter(frontier.iter().map(|t| (t.clone(), -1)));
350
351                if tracked.dropped {
352                    None
353                } else {
354                    tracked.dropped = true;
355                    Some(ComputeResponse::SubscribeResponse(
356                        subscribe_id,
357                        SubscribeResponse::DroppedAt(frontier),
358                    ))
359                }
360            }
361        };
362
363        if tracked.frontiers.frontier().is_empty() {
364            // All shards have reported advancement to the empty frontier or dropping, so
365            // we do not expect further updates for this subscribe.
366            self.pending_subscribes.remove(&subscribe_id);
367        }
368
369        emit_response
370    }
371}
372
373impl<T> PartitionedState<ComputeCommand<T>, ComputeResponse<T>> for PartitionedComputeState<T>
374where
375    T: ComputeControllerTimestamp,
376{
377    fn split_command(&mut self, command: ComputeCommand<T>) -> Vec<Option<ComputeCommand<T>>> {
378        self.observe_command(&command);
379
380        // As specified by the compute protocol:
381        //  * Forward `Hello` and `UpdateConfiguration` commands to all shards.
382        //  * Forward all other commands to the first shard only.
383        match command {
384            command @ ComputeCommand::Hello { .. }
385            | command @ ComputeCommand::UpdateConfiguration(_) => {
386                vec![Some(command); self.parts]
387            }
388            command => {
389                let mut r = vec![None; self.parts];
390                r[0] = Some(command);
391                r
392            }
393        }
394    }
395
396    fn absorb_response(
397        &mut self,
398        shard_id: usize,
399        message: ComputeResponse<T>,
400    ) -> Option<Result<ComputeResponse<T>, anyhow::Error>> {
401        let response = match message {
402            ComputeResponse::Frontiers(id, frontiers) => {
403                self.absorb_frontiers(shard_id, id, frontiers)
404            }
405            ComputeResponse::PeekResponse(uuid, response, otel_ctx) => {
406                self.absorb_peek_response(shard_id, uuid, response, otel_ctx)
407            }
408            ComputeResponse::SubscribeResponse(id, response) => {
409                self.absorb_subscribe_response(id, response)
410            }
411            ComputeResponse::CopyToResponse(id, response) => {
412                self.absorb_copy_to_response(shard_id, id, response)
413            }
414            response @ ComputeResponse::Status(_) => {
415                // Pass through status responses.
416                Some(response)
417            }
418        };
419
420        response.map(Ok)
421    }
422}
423
424/// Tracked frontiers for an index or a sink collection.
425///
426/// Each frontier is maintained both as a `MutableAntichain` across all partitions and individually
427/// for each partition.
428#[derive(Debug)]
429struct TrackedFrontiers<T> {
430    /// The tracked write frontier.
431    write_frontier: (MutableAntichain<T>, Vec<Antichain<T>>),
432    /// The tracked input frontier.
433    input_frontier: (MutableAntichain<T>, Vec<Antichain<T>>),
434    /// The tracked output frontier.
435    output_frontier: (MutableAntichain<T>, Vec<Antichain<T>>),
436}
437
438impl<T> TrackedFrontiers<T>
439where
440    T: timely::progress::Timestamp + Lattice,
441{
442    /// Initializes frontier tracking state for a new collection.
443    fn new(parts: usize) -> Self {
444        // TODO(benesch): fix this dangerous use of `as`.
445        #[allow(clippy::as_conversions)]
446        let parts_diff = parts as i64;
447
448        let mut frontier = MutableAntichain::new();
449        frontier.update_iter([(T::minimum(), parts_diff)]);
450        let part_frontiers = vec![Antichain::from_elem(T::minimum()); parts];
451        let frontier_entry = (frontier, part_frontiers);
452
453        Self {
454            write_frontier: frontier_entry.clone(),
455            input_frontier: frontier_entry.clone(),
456            output_frontier: frontier_entry,
457        }
458    }
459
460    /// Returns whether all tracked frontiers have advanced to the empty frontier.
461    fn all_empty(&self) -> bool {
462        self.write_frontier.0.frontier().is_empty()
463            && self.input_frontier.0.frontier().is_empty()
464            && self.output_frontier.0.frontier().is_empty()
465    }
466
467    /// Updates write frontier tracking with a new shard frontier.
468    ///
469    /// If this causes the global write frontier to advance, the advanced frontier is returned.
470    fn update_write_frontier(
471        &mut self,
472        shard_id: usize,
473        new_shard_frontier: &Antichain<T>,
474    ) -> Option<Antichain<T>> {
475        Self::update_frontier(&mut self.write_frontier, shard_id, new_shard_frontier)
476    }
477
478    /// Updates input frontier tracking with a new shard frontier.
479    ///
480    /// If this causes the global input frontier to advance, the advanced frontier is returned.
481    fn update_input_frontier(
482        &mut self,
483        shard_id: usize,
484        new_shard_frontier: &Antichain<T>,
485    ) -> Option<Antichain<T>> {
486        Self::update_frontier(&mut self.input_frontier, shard_id, new_shard_frontier)
487    }
488
489    /// Updates output frontier tracking with a new shard frontier.
490    ///
491    /// If this causes the global output frontier to advance, the advanced frontier is returned.
492    fn update_output_frontier(
493        &mut self,
494        shard_id: usize,
495        new_shard_frontier: &Antichain<T>,
496    ) -> Option<Antichain<T>> {
497        Self::update_frontier(&mut self.output_frontier, shard_id, new_shard_frontier)
498    }
499
500    /// Updates the provided frontier entry with a new shard frontier.
501    fn update_frontier(
502        entry: &mut (MutableAntichain<T>, Vec<Antichain<T>>),
503        shard_id: usize,
504        new_shard_frontier: &Antichain<T>,
505    ) -> Option<Antichain<T>> {
506        let (frontier, shard_frontiers) = entry;
507
508        let old_frontier = frontier.frontier().to_owned();
509        let shard_frontier = &mut shard_frontiers[shard_id];
510        frontier.update_iter(shard_frontier.iter().map(|t| (t.clone(), -1)));
511        shard_frontier.join_assign(new_shard_frontier);
512        frontier.update_iter(shard_frontier.iter().map(|t| (t.clone(), 1)));
513
514        let new_frontier = frontier.frontier();
515
516        if PartialOrder::less_than(&old_frontier.borrow(), &new_frontier) {
517            Some(new_frontier.to_owned())
518        } else {
519            None
520        }
521    }
522}
523
524#[derive(Debug)]
525struct PendingSubscribe<T> {
526    /// The subscribe frontiers of the partitioned shards.
527    frontiers: MutableAntichain<T>,
528    /// The updates we are holding back until their timestamps are complete.
529    stashed_updates: Result<Vec<(T, Row, Diff)>, String>,
530    /// The row size of stashed updates, for `max_result_size` checking.
531    stashed_result_size: usize,
532    /// Whether we have already emitted a `DroppedAt` response for this subscribe.
533    ///
534    /// This field is used to ensure we emit such a response only once.
535    dropped: bool,
536}
537
538impl<T: ComputeControllerTimestamp> PendingSubscribe<T> {
539    fn new(parts: usize) -> Self {
540        let mut frontiers = MutableAntichain::new();
541        // TODO(benesch): fix this dangerous use of `as`.
542        #[allow(clippy::as_conversions)]
543        frontiers.update_iter([(T::minimum(), parts as i64)]);
544
545        Self {
546            frontiers,
547            stashed_updates: Ok(Vec::new()),
548            stashed_result_size: 0,
549            dropped: false,
550        }
551    }
552
553    /// Stash a new batch of updates.
554    ///
555    /// This also implements the short-circuit behavior of error responses, and performs
556    /// `max_result_size` checking.
557    fn stash(&mut self, new_updates: Result<Vec<(T, Row, Diff)>, String>, max_result_size: u64) {
558        match (&mut self.stashed_updates, new_updates) {
559            (Err(_), _) => {
560                // Subscribe is borked; nothing to do.
561                // TODO: Consider refreshing error?
562            }
563            (_, Err(text)) => {
564                self.stashed_updates = Err(text);
565            }
566            (Ok(stashed), Ok(new)) => {
567                let new_size: usize = new.iter().map(|(_, row, _)| row.byte_len()).sum();
568                self.stashed_result_size += new_size;
569
570                if self.stashed_result_size > max_result_size.cast_into() {
571                    self.stashed_updates = Err(format!(
572                        "total result exceeds max size of {}",
573                        ByteSize::b(max_result_size)
574                    ));
575                } else {
576                    stashed.extend(new);
577                }
578            }
579        }
580    }
581}
582
583/// Merge two [`PeekResponse`]s.
584fn merge_peek_responses(
585    resp1: PeekResponse,
586    resp2: PeekResponse,
587    max_result_size: u64,
588) -> PeekResponse {
589    use PeekResponse::*;
590
591    // Cancelations and errors short-circuit. Cancelations take precedence over errors.
592    let (resp1, resp2) = match (resp1, resp2) {
593        (Canceled, _) | (_, Canceled) => return Canceled,
594        (Error(e), _) | (_, Error(e)) => return Error(e),
595        resps => resps,
596    };
597
598    let total_byte_len = resp1.inline_byte_len() + resp2.inline_byte_len();
599    if total_byte_len > max_result_size.cast_into() {
600        // Note: We match on this specific error message in tests so it's important that
601        // nothing else returns the same string.
602        let err = format!(
603            "total result exceeds max size of {}",
604            ByteSize::b(max_result_size)
605        );
606        return Error(err);
607    }
608
609    match (resp1, resp2) {
610        (Rows(mut rows1), Rows(rows2)) => {
611            rows1.merge(&rows2);
612            Rows(rows1)
613        }
614        (Rows(rows), Stashed(mut stashed)) | (Stashed(mut stashed), Rows(rows)) => {
615            stashed.inline_rows.merge(&rows);
616            Stashed(stashed)
617        }
618        (Stashed(stashed1), Stashed(stashed2)) => {
619            // Deconstruct so we don't miss adding new fields. We need to be careful about
620            // merging everything!
621            let StashedPeekResponse {
622                num_rows_batches: num_rows_batches1,
623                encoded_size_bytes: encoded_size_bytes1,
624                relation_desc: relation_desc1,
625                shard_id: shard_id1,
626                batches: mut batches1,
627                inline_rows: mut inline_rows1,
628            } = *stashed1;
629            let StashedPeekResponse {
630                num_rows_batches: num_rows_batches2,
631                encoded_size_bytes: encoded_size_bytes2,
632                relation_desc: relation_desc2,
633                shard_id: shard_id2,
634                batches: mut batches2,
635                inline_rows: inline_rows2,
636            } = *stashed2;
637
638            if shard_id1 != shard_id2 {
639                soft_panic_or_log!(
640                    "shard IDs of stashed responses do not match: \
641                             {shard_id1} != {shard_id2}"
642                );
643                return Error("internal error".into());
644            }
645            if relation_desc1 != relation_desc2 {
646                soft_panic_or_log!(
647                    "relation descs of stashed responses do not match: \
648                             {relation_desc1:?} != {relation_desc2:?}"
649                );
650                return Error("internal error".into());
651            }
652
653            batches1.append(&mut batches2);
654            inline_rows1.merge(&inline_rows2);
655
656            Stashed(Box::new(StashedPeekResponse {
657                num_rows_batches: num_rows_batches1 + num_rows_batches2,
658                encoded_size_bytes: encoded_size_bytes1 + encoded_size_bytes2,
659                relation_desc: relation_desc1,
660                shard_id: shard_id1,
661                batches: batches1,
662                inline_rows: inline_rows1,
663            }))
664        }
665        _ => unreachable!("handled above"),
666    }
667}