Skip to main content

mz_compute_client/
service.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Compute layer client and server.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::mem;
14
15use async_trait::async_trait;
16use bytesize::ByteSize;
17use differential_dataflow::lattice::Lattice;
18use mz_expr::row::RowCollection;
19use mz_ore::cast::CastInto;
20use mz_ore::soft_panic_or_log;
21use mz_ore::tracing::OpenTelemetryContext;
22use mz_repr::{GlobalId, UpdateCollection};
23use mz_service::client::{GenericClient, Partitionable, PartitionedState};
24use timely::PartialOrder;
25use timely::progress::frontier::{Antichain, MutableAntichain};
26use uuid::Uuid;
27
28use crate::controller::ComputeControllerTimestamp;
29use crate::protocol::command::ComputeCommand;
30use crate::protocol::response::{
31    ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, StashedPeekResponse,
32    SubscribeBatch, SubscribeResponse,
33};
34
35/// A client to a compute server.
36pub trait ComputeClient<T = mz_repr::Timestamp>:
37    GenericClient<ComputeCommand<T>, ComputeResponse<T>>
38{
39}
40
41impl<C, T> ComputeClient<T> for C where C: GenericClient<ComputeCommand<T>, ComputeResponse<T>> {}
42
43#[async_trait]
44impl<T: Send> GenericClient<ComputeCommand<T>, ComputeResponse<T>> for Box<dyn ComputeClient<T>> {
45    async fn send(&mut self, cmd: ComputeCommand<T>) -> Result<(), anyhow::Error> {
46        (**self).send(cmd).await
47    }
48
49    /// # Cancel safety
50    ///
51    /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
52    /// statement and some other branch completes first, it is guaranteed that no messages were
53    /// received by this client.
54    async fn recv(&mut self) -> Result<Option<ComputeResponse<T>>, anyhow::Error> {
55        // `GenericClient::recv` is required to be cancel safe.
56        (**self).recv().await
57    }
58}
59
60/// Maintained state for partitioned compute clients.
61///
62/// This helper type unifies the responses of multiple partitioned workers in order to present as a
63/// single worker:
64///
65///   * It emits `Frontiers` responses reporting the minimum/meet of frontiers reported by the
66///     individual workers.
67///   * It emits `PeekResponse`s and `SubscribeResponse`s reporting the union of the responses
68///     received from the workers.
69///
70/// In the compute communication stack, this client is instantiated several times:
71///
72///   * One instance on the controller side, dispatching between cluster processes.
73///   * One instance in each cluster process, dispatching between timely worker threads.
74///
75/// Note that because compute commands, except `Hello` and `UpdateConfiguration`, are only
76/// sent to the first process, the cluster-side instances of `PartitionedComputeState` are not
77/// guaranteed to see all compute commands. Or more specifically: The instance running inside
78/// process 0 sees all commands, whereas the instances running inside the other processes only see
79/// `Hello` and `UpdateConfiguration`. The `PartitionedComputeState` implementation must be
80/// able to cope with this limited visibility. It does so by performing most of its state management
81/// based on observed compute responses rather than commands.
82#[derive(Debug)]
83pub struct PartitionedComputeState<T> {
84    /// Number of partitions the state machine represents.
85    parts: usize,
86    /// The maximum result size this state machine can return.
87    ///
88    /// This is updated upon receiving [`ComputeCommand::UpdateConfiguration`]s.
89    max_result_size: u64,
90    /// Tracked frontiers for indexes and sinks.
91    ///
92    /// Frontier tracking for a collection is initialized when the first `Frontiers` response
93    /// for that collection is received. Frontier tracking is ceased when all shards have reported
94    /// advancement to the empty frontier for all frontier kinds.
95    ///
96    /// The compute protocol requires that shards always emit `Frontiers` responses reporting empty
97    /// frontiers for all frontier kinds when a collection is dropped. It further requires that no
98    /// further `Frontier` responses are emitted for a collection after the empty frontiers were
99    /// reported. These properties ensure that a) we always cease frontier tracking for collections
100    /// that have been dropped and b) frontier tracking for a collection is not re-initialized
101    /// after it was ceased.
102    frontiers: BTreeMap<GlobalId, TrackedFrontiers<T>>,
103    /// For each in-progress peek the response data received so far, and the set of shards that
104    /// provided responses already.
105    ///
106    /// Tracking of responses for a peek is initialized when the first `PeekResponse` for that peek
107    /// is received. Once all shards have provided a `PeekResponse`, a unified peek response is
108    /// emitted and the peek tracking state is dropped again.
109    ///
110    /// The compute protocol requires that exactly one response is emitted for each peek. This
111    /// property ensures that a) we can eventually drop the tracking state maintained for a peek
112    /// and b) we won't re-initialize tracking for a peek we have already served.
113    peek_responses: BTreeMap<Uuid, (PeekResponse, BTreeSet<usize>)>,
114    /// For each in-progress copy-to the response data received so far, and the set of shards that
115    /// provided responses already.
116    ///
117    /// Tracking of responses for a COPY TO is initialized when the first `CopyResponse` for that command
118    /// is received. Once all shards have provided a `CopyResponse`, a unified copy response is
119    /// emitted and the copy_to tracking state is dropped again.
120    ///
121    /// The compute protocol requires that exactly one response is emitted for each COPY TO command. This
122    /// property ensures that a) we can eventually drop the tracking state maintained for a copy
123    /// and b) we won't re-initialize tracking for a copy we have already served.
124    copy_to_responses: BTreeMap<GlobalId, (CopyToResponse, BTreeSet<usize>)>,
125    /// Tracks in-progress `SUBSCRIBE`s, and the stashed rows we are holding back until their
126    /// timestamps are complete.
127    ///
128    /// The updates may be `Err` if any of the batches have reported an error, in which case the
129    /// subscribe is permanently borked.
130    ///
131    /// Tracking of a subscribe is initialized when the first `SubscribeResponse` for that
132    /// subscribe is received. Once all shards have emitted an "end-of-subscribe" response the
133    /// subscribe tracking state is dropped again.
134    ///
135    /// The compute protocol requires that for a subscribe that shuts down an end-of-subscribe
136    /// response is emitted:
137    ///
138    ///   * Either a `Batch` response reporting advancement to the empty frontier...
139    ///   * ... or a `DroppedAt` response reporting that the subscribe was dropped before
140    ///     completing.
141    ///
142    /// The compute protocol further requires that no further `SubscribeResponse`s are emitted for
143    /// a subscribe after an end-of-subscribe was reported.
144    ///
145    /// These two properties ensure that a) once a subscribe has shut down, we can eventually drop
146    /// the tracking state maintained for it and b) we won't re-initialize tracking for a subscribe
147    /// we have already dropped.
148    pending_subscribes: BTreeMap<GlobalId, PendingSubscribe<T>>,
149}
150
151impl<T> Partitionable<ComputeCommand<T>, ComputeResponse<T>>
152    for (ComputeCommand<T>, ComputeResponse<T>)
153where
154    T: ComputeControllerTimestamp,
155{
156    type PartitionedState = PartitionedComputeState<T>;
157
158    fn new(parts: usize) -> PartitionedComputeState<T> {
159        PartitionedComputeState {
160            parts,
161            max_result_size: u64::MAX,
162            frontiers: BTreeMap::new(),
163            peek_responses: BTreeMap::new(),
164            pending_subscribes: BTreeMap::new(),
165            copy_to_responses: BTreeMap::new(),
166        }
167    }
168}
169
170impl<T> PartitionedComputeState<T>
171where
172    T: ComputeControllerTimestamp,
173{
174    /// Observes commands that move past.
175    pub fn observe_command(&mut self, command: &ComputeCommand<T>) {
176        match command {
177            ComputeCommand::UpdateConfiguration(config) => {
178                if let Some(max_result_size) = config.max_result_size {
179                    self.max_result_size = max_result_size;
180                }
181            }
182            _ => {
183                // We are not guaranteed to observe other compute commands. We
184                // must therefore not add any logic here that relies on doing so.
185            }
186        }
187    }
188
189    /// Absorb a [`ComputeResponse::Frontiers`].
190    fn absorb_frontiers(
191        &mut self,
192        shard_id: usize,
193        collection_id: GlobalId,
194        frontiers: FrontiersResponse<T>,
195    ) -> Option<ComputeResponse<T>> {
196        let tracked = self
197            .frontiers
198            .entry(collection_id)
199            .or_insert_with(|| TrackedFrontiers::new(self.parts));
200
201        let write_frontier = frontiers
202            .write_frontier
203            .and_then(|f| tracked.update_write_frontier(shard_id, &f));
204        let input_frontier = frontiers
205            .input_frontier
206            .and_then(|f| tracked.update_input_frontier(shard_id, &f));
207        let output_frontier = frontiers
208            .output_frontier
209            .and_then(|f| tracked.update_output_frontier(shard_id, &f));
210
211        let frontiers = FrontiersResponse {
212            write_frontier,
213            input_frontier,
214            output_frontier,
215        };
216        let result = frontiers
217            .has_updates()
218            .then_some(ComputeResponse::Frontiers(collection_id, frontiers));
219
220        if tracked.all_empty() {
221            // All shards have reported advancement to the empty frontier, so we do not
222            // expect further updates for this collection.
223            self.frontiers.remove(&collection_id);
224        }
225
226        result
227    }
228
229    /// Absorb a [`ComputeResponse::PeekResponse`].
230    fn absorb_peek_response(
231        &mut self,
232        shard_id: usize,
233        uuid: Uuid,
234        response: PeekResponse,
235        otel_ctx: OpenTelemetryContext,
236    ) -> Option<ComputeResponse<T>> {
237        let (merged, ready_shards) = self.peek_responses.entry(uuid).or_insert((
238            PeekResponse::Rows(vec![RowCollection::default()]),
239            BTreeSet::new(),
240        ));
241
242        let first = ready_shards.insert(shard_id);
243        assert!(first, "duplicate peek response");
244
245        let resp1 = mem::replace(merged, PeekResponse::Canceled);
246        *merged = merge_peek_responses(resp1, response, self.max_result_size);
247
248        if ready_shards.len() == self.parts {
249            let (response, _) = self.peek_responses.remove(&uuid).unwrap();
250            Some(ComputeResponse::PeekResponse(uuid, response, otel_ctx))
251        } else {
252            None
253        }
254    }
255
256    /// Absorb a [`ComputeResponse::CopyToResponse`].
257    fn absorb_copy_to_response(
258        &mut self,
259        shard_id: usize,
260        copyto_id: GlobalId,
261        response: CopyToResponse,
262    ) -> Option<ComputeResponse<T>> {
263        use CopyToResponse::*;
264
265        let (merged, ready_shards) = self
266            .copy_to_responses
267            .entry(copyto_id)
268            .or_insert((CopyToResponse::RowCount(0), BTreeSet::new()));
269
270        let first = ready_shards.insert(shard_id);
271        assert!(first, "duplicate copy-to response");
272
273        let resp1 = mem::replace(merged, Dropped);
274        *merged = match (resp1, response) {
275            (Dropped, _) | (_, Dropped) => Dropped,
276            (Error(e), _) | (_, Error(e)) => Error(e),
277            (RowCount(r1), RowCount(r2)) => RowCount(r1 + r2),
278        };
279
280        if ready_shards.len() == self.parts {
281            let (response, _) = self.copy_to_responses.remove(&copyto_id).unwrap();
282            Some(ComputeResponse::CopyToResponse(copyto_id, response))
283        } else {
284            None
285        }
286    }
287
288    /// Absorb a [`ComputeResponse::SubscribeResponse`].
289    fn absorb_subscribe_response(
290        &mut self,
291        subscribe_id: GlobalId,
292        response: SubscribeResponse<T>,
293    ) -> Option<ComputeResponse<T>> {
294        let tracked = self
295            .pending_subscribes
296            .entry(subscribe_id)
297            .or_insert_with(|| PendingSubscribe::new(self.parts));
298
299        let emit_response = match response {
300            SubscribeResponse::Batch(batch) => {
301                let frontiers = &mut tracked.frontiers;
302                let old_frontier = frontiers.frontier().to_owned();
303                frontiers.update_iter(batch.lower.into_iter().map(|t| (t, -1)));
304                frontiers.update_iter(batch.upper.into_iter().map(|t| (t, 1)));
305                let new_frontier = frontiers.frontier().to_owned();
306
307                tracked.stash(batch.updates, self.max_result_size);
308
309                // If the frontier has advanced, it is time to announce subscribe progress. Unless
310                // we have already announced that the subscribe has been dropped, in which case we
311                // must keep quiet.
312                if old_frontier != new_frontier && !tracked.dropped {
313                    let updates = match &mut tracked.stashed_updates {
314                        Ok(stashed_updates) => {
315                            // Split each collection along the frontier, passing the prefix along.
316                            let mut ship = vec![];
317                            let mut keep = vec![];
318                            for collection in stashed_updates.drain(..) {
319                                let partition_point = collection
320                                    .times()
321                                    .partition_point(|t| !new_frontier.less_equal(t));
322                                let (ship_coll, keep_coll) = collection.split_at(partition_point);
323                                if ship_coll.len() > 0 {
324                                    ship.push(ship_coll);
325                                }
326                                if keep_coll.len() > 0 {
327                                    keep.push(keep_coll);
328                                }
329                            }
330                            tracked.stashed_updates = Ok(keep);
331                            Ok(ship)
332                        }
333                        Err(text) => Err(text.clone()),
334                    };
335                    Some(ComputeResponse::SubscribeResponse(
336                        subscribe_id,
337                        SubscribeResponse::Batch(SubscribeBatch {
338                            lower: old_frontier,
339                            upper: new_frontier,
340                            updates,
341                        }),
342                    ))
343                } else {
344                    None
345                }
346            }
347            SubscribeResponse::DroppedAt(frontier) => {
348                tracked
349                    .frontiers
350                    .update_iter(frontier.iter().map(|t| (t.clone(), -1)));
351
352                if tracked.dropped {
353                    None
354                } else {
355                    tracked.dropped = true;
356                    Some(ComputeResponse::SubscribeResponse(
357                        subscribe_id,
358                        SubscribeResponse::DroppedAt(frontier),
359                    ))
360                }
361            }
362        };
363
364        if tracked.frontiers.frontier().is_empty() {
365            // All shards have reported advancement to the empty frontier or dropping, so
366            // we do not expect further updates for this subscribe.
367            self.pending_subscribes.remove(&subscribe_id);
368        }
369
370        emit_response
371    }
372}
373
374impl<T> PartitionedState<ComputeCommand<T>, ComputeResponse<T>> for PartitionedComputeState<T>
375where
376    T: ComputeControllerTimestamp,
377{
378    fn split_command(&mut self, command: ComputeCommand<T>) -> Vec<Option<ComputeCommand<T>>> {
379        self.observe_command(&command);
380
381        // As specified by the compute protocol:
382        //  * Forward `Hello` and `UpdateConfiguration` commands to all shards.
383        //  * Forward all other commands to the first shard only.
384        match command {
385            command @ ComputeCommand::Hello { .. }
386            | command @ ComputeCommand::UpdateConfiguration(_) => {
387                vec![Some(command); self.parts]
388            }
389            command => {
390                let mut r = vec![None; self.parts];
391                r[0] = Some(command);
392                r
393            }
394        }
395    }
396
397    fn absorb_response(
398        &mut self,
399        shard_id: usize,
400        message: ComputeResponse<T>,
401    ) -> Option<Result<ComputeResponse<T>, anyhow::Error>> {
402        let response = match message {
403            ComputeResponse::Frontiers(id, frontiers) => {
404                self.absorb_frontiers(shard_id, id, frontiers)
405            }
406            ComputeResponse::PeekResponse(uuid, response, otel_ctx) => {
407                self.absorb_peek_response(shard_id, uuid, response, otel_ctx)
408            }
409            ComputeResponse::SubscribeResponse(id, response) => {
410                self.absorb_subscribe_response(id, response)
411            }
412            ComputeResponse::CopyToResponse(id, response) => {
413                self.absorb_copy_to_response(shard_id, id, response)
414            }
415            response @ ComputeResponse::Status(_) => {
416                // Pass through status responses.
417                Some(response)
418            }
419        };
420
421        response.map(Ok)
422    }
423}
424
425/// Tracked frontiers for an index or a sink collection.
426///
427/// Each frontier is maintained both as a `MutableAntichain` across all partitions and individually
428/// for each partition.
429#[derive(Debug)]
430struct TrackedFrontiers<T> {
431    /// The tracked write frontier.
432    write_frontier: (MutableAntichain<T>, Vec<Antichain<T>>),
433    /// The tracked input frontier.
434    input_frontier: (MutableAntichain<T>, Vec<Antichain<T>>),
435    /// The tracked output frontier.
436    output_frontier: (MutableAntichain<T>, Vec<Antichain<T>>),
437}
438
439impl<T> TrackedFrontiers<T>
440where
441    T: timely::progress::Timestamp + Lattice,
442{
443    /// Initializes frontier tracking state for a new collection.
444    fn new(parts: usize) -> Self {
445        // TODO(benesch): fix this dangerous use of `as`.
446        #[allow(clippy::as_conversions)]
447        let parts_diff = parts as i64;
448
449        let mut frontier = MutableAntichain::new();
450        frontier.update_iter([(T::minimum(), parts_diff)]);
451        let part_frontiers = vec![Antichain::from_elem(T::minimum()); parts];
452        let frontier_entry = (frontier, part_frontiers);
453
454        Self {
455            write_frontier: frontier_entry.clone(),
456            input_frontier: frontier_entry.clone(),
457            output_frontier: frontier_entry,
458        }
459    }
460
461    /// Returns whether all tracked frontiers have advanced to the empty frontier.
462    fn all_empty(&self) -> bool {
463        self.write_frontier.0.frontier().is_empty()
464            && self.input_frontier.0.frontier().is_empty()
465            && self.output_frontier.0.frontier().is_empty()
466    }
467
468    /// Updates write frontier tracking with a new shard frontier.
469    ///
470    /// If this causes the global write frontier to advance, the advanced frontier is returned.
471    fn update_write_frontier(
472        &mut self,
473        shard_id: usize,
474        new_shard_frontier: &Antichain<T>,
475    ) -> Option<Antichain<T>> {
476        Self::update_frontier(&mut self.write_frontier, shard_id, new_shard_frontier)
477    }
478
479    /// Updates input frontier tracking with a new shard frontier.
480    ///
481    /// If this causes the global input frontier to advance, the advanced frontier is returned.
482    fn update_input_frontier(
483        &mut self,
484        shard_id: usize,
485        new_shard_frontier: &Antichain<T>,
486    ) -> Option<Antichain<T>> {
487        Self::update_frontier(&mut self.input_frontier, shard_id, new_shard_frontier)
488    }
489
490    /// Updates output frontier tracking with a new shard frontier.
491    ///
492    /// If this causes the global output frontier to advance, the advanced frontier is returned.
493    fn update_output_frontier(
494        &mut self,
495        shard_id: usize,
496        new_shard_frontier: &Antichain<T>,
497    ) -> Option<Antichain<T>> {
498        Self::update_frontier(&mut self.output_frontier, shard_id, new_shard_frontier)
499    }
500
501    /// Updates the provided frontier entry with a new shard frontier.
502    fn update_frontier(
503        entry: &mut (MutableAntichain<T>, Vec<Antichain<T>>),
504        shard_id: usize,
505        new_shard_frontier: &Antichain<T>,
506    ) -> Option<Antichain<T>> {
507        let (frontier, shard_frontiers) = entry;
508
509        let old_frontier = frontier.frontier().to_owned();
510        let shard_frontier = &mut shard_frontiers[shard_id];
511        frontier.update_iter(shard_frontier.iter().map(|t| (t.clone(), -1)));
512        shard_frontier.join_assign(new_shard_frontier);
513        frontier.update_iter(shard_frontier.iter().map(|t| (t.clone(), 1)));
514
515        let new_frontier = frontier.frontier();
516
517        if PartialOrder::less_than(&old_frontier.borrow(), &new_frontier) {
518            Some(new_frontier.to_owned())
519        } else {
520            None
521        }
522    }
523}
524
525#[derive(Debug)]
526struct PendingSubscribe<T> {
527    /// The subscribe frontiers of the partitioned shards.
528    frontiers: MutableAntichain<T>,
529    /// The updates we are holding back until their timestamps are complete.
530    stashed_updates: Result<Vec<UpdateCollection<T>>, String>,
531    /// The row size of stashed updates, for `max_result_size` checking.
532    stashed_result_size: usize,
533    /// Whether we have already emitted a `DroppedAt` response for this subscribe.
534    ///
535    /// This field is used to ensure we emit such a response only once.
536    dropped: bool,
537}
538
539impl<T: ComputeControllerTimestamp> PendingSubscribe<T> {
540    fn new(parts: usize) -> Self {
541        let mut frontiers = MutableAntichain::new();
542        // TODO(benesch): fix this dangerous use of `as`.
543        #[allow(clippy::as_conversions)]
544        frontiers.update_iter([(T::minimum(), parts as i64)]);
545
546        Self {
547            frontiers,
548            stashed_updates: Ok(Vec::new()),
549            stashed_result_size: 0,
550            dropped: false,
551        }
552    }
553
554    /// Stash a new batch of updates.
555    ///
556    /// This also implements the short-circuit behavior of error responses, and performs
557    /// `max_result_size` checking.
558    fn stash(
559        &mut self,
560        new_updates: Result<Vec<UpdateCollection<T>>, String>,
561        max_result_size: u64,
562    ) {
563        match (&mut self.stashed_updates, new_updates) {
564            (Err(_), _) => {
565                // Subscribe is borked; nothing to do.
566                // TODO: Consider refreshing error?
567            }
568            (_, Err(text)) => {
569                self.stashed_updates = Err(text);
570            }
571            (Ok(stashed), Ok(new)) => {
572                let new_size: usize = new.iter().map(|coll| coll.byte_len()).sum();
573                self.stashed_result_size += new_size;
574
575                if self.stashed_result_size > max_result_size.cast_into() {
576                    self.stashed_updates = Err(format!(
577                        "total result exceeds max size of {}",
578                        ByteSize::b(max_result_size)
579                    ));
580                } else {
581                    stashed.extend(new);
582                }
583            }
584        }
585    }
586}
587
588/// Merge two [`PeekResponse`]s.
589fn merge_peek_responses(
590    resp1: PeekResponse,
591    resp2: PeekResponse,
592    max_result_size: u64,
593) -> PeekResponse {
594    use PeekResponse::*;
595
596    // Cancelations and errors short-circuit. Cancelations take precedence over errors.
597    let (resp1, resp2) = match (resp1, resp2) {
598        (Canceled, _) | (_, Canceled) => return Canceled,
599        (Error(e), _) | (_, Error(e)) => return Error(e),
600        resps => resps,
601    };
602
603    let total_byte_len = resp1.inline_byte_len() + resp2.inline_byte_len();
604    if total_byte_len > max_result_size.cast_into() {
605        // Note: We match on this specific error message in tests so it's important that
606        // nothing else returns the same string.
607        let err = format!(
608            "total result exceeds max size of {}",
609            ByteSize::b(max_result_size)
610        );
611        return Error(err);
612    }
613
614    match (resp1, resp2) {
615        (Rows(mut rows1), Rows(rows2)) => {
616            rows1.extend(rows2);
617            Rows(rows1)
618        }
619        (Rows(rows), Stashed(mut stashed)) | (Stashed(mut stashed), Rows(rows)) => {
620            stashed.inline_rows.extend(rows);
621            Stashed(stashed)
622        }
623        (Stashed(stashed1), Stashed(stashed2)) => {
624            // Deconstruct so we don't miss adding new fields. We need to be careful about
625            // merging everything!
626            let StashedPeekResponse {
627                num_rows_batches: num_rows_batches1,
628                encoded_size_bytes: encoded_size_bytes1,
629                relation_desc: relation_desc1,
630                shard_id: shard_id1,
631                batches: mut batches1,
632                inline_rows: mut inline_rows1,
633            } = *stashed1;
634            let StashedPeekResponse {
635                num_rows_batches: num_rows_batches2,
636                encoded_size_bytes: encoded_size_bytes2,
637                relation_desc: relation_desc2,
638                shard_id: shard_id2,
639                batches: mut batches2,
640                inline_rows: inline_rows2,
641            } = *stashed2;
642
643            if shard_id1 != shard_id2 {
644                soft_panic_or_log!(
645                    "shard IDs of stashed responses do not match: \
646                             {shard_id1} != {shard_id2}"
647                );
648                return Error("internal error".into());
649            }
650            if relation_desc1 != relation_desc2 {
651                soft_panic_or_log!(
652                    "relation descs of stashed responses do not match: \
653                             {relation_desc1:?} != {relation_desc2:?}"
654                );
655                return Error("internal error".into());
656            }
657
658            batches1.append(&mut batches2);
659            inline_rows1.extend(inline_rows2);
660
661            Stashed(Box::new(StashedPeekResponse {
662                num_rows_batches: num_rows_batches1 + num_rows_batches2,
663                encoded_size_bytes: encoded_size_bytes1 + encoded_size_bytes2,
664                relation_desc: relation_desc1,
665                shard_id: shard_id1,
666                batches: batches1,
667                inline_rows: inline_rows1,
668            }))
669        }
670        _ => unreachable!("handled above"),
671    }
672}