1use std::num::NonZeroUsize;
13
14use mz_compute_types::plan::LirId;
15use mz_expr::row::RowCollection;
16use mz_ore::cast::CastFrom;
17use mz_ore::tracing::OpenTelemetryContext;
18use mz_persist_client::batch::ProtoBatch;
19use mz_persist_types::ShardId;
20use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError, any_uuid};
21use mz_repr::{Diff, GlobalId, RelationDesc, Row};
22use mz_timely_util::progress::any_antichain;
23use proptest::prelude::{Arbitrary, any};
24use proptest::strategy::{BoxedStrategy, Just, Strategy, Union};
25use proptest_derive::Arbitrary;
26use serde::{Deserialize, Serialize};
27use timely::progress::frontier::Antichain;
28use uuid::Uuid;
29
30include!(concat!(
31 env!("OUT_DIR"),
32 "/mz_compute_client.protocol.response.rs"
33));
34
35#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
42pub enum ComputeResponse<T = mz_repr::Timestamp> {
43 Frontiers(GlobalId, FrontiersResponse<T>),
75
76 PeekResponse(Uuid, PeekResponse, OpenTelemetryContext),
93
94 SubscribeResponse(GlobalId, SubscribeResponse<T>),
127
128 CopyToResponse(GlobalId, CopyToResponse),
138
139 Status(StatusResponse),
152}
153
154impl RustType<ProtoComputeResponse> for ComputeResponse<mz_repr::Timestamp> {
155 fn into_proto(&self) -> ProtoComputeResponse {
156 use proto_compute_response::Kind::*;
157 use proto_compute_response::*;
158 ProtoComputeResponse {
159 kind: Some(match self {
160 ComputeResponse::Frontiers(id, resp) => Frontiers(ProtoFrontiersKind {
161 id: Some(id.into_proto()),
162 resp: Some(resp.into_proto()),
163 }),
164 ComputeResponse::PeekResponse(id, resp, otel_ctx) => {
165 PeekResponse(ProtoPeekResponseKind {
166 id: Some(id.into_proto()),
167 resp: Some(resp.into_proto()),
168 otel_ctx: otel_ctx.clone().into(),
169 })
170 }
171 ComputeResponse::SubscribeResponse(id, resp) => {
172 SubscribeResponse(ProtoSubscribeResponseKind {
173 id: Some(id.into_proto()),
174 resp: Some(resp.into_proto()),
175 })
176 }
177 ComputeResponse::CopyToResponse(id, resp) => {
178 CopyToResponse(ProtoCopyToResponseKind {
179 id: Some(id.into_proto()),
180 resp: Some(resp.into_proto()),
181 })
182 }
183 ComputeResponse::Status(resp) => Status(resp.into_proto()),
184 }),
185 }
186 }
187
188 fn from_proto(proto: ProtoComputeResponse) -> Result<Self, TryFromProtoError> {
189 use proto_compute_response::Kind::*;
190 match proto.kind {
191 Some(Frontiers(resp)) => Ok(ComputeResponse::Frontiers(
192 resp.id.into_rust_if_some("ProtoFrontiersKind::id")?,
193 resp.resp.into_rust_if_some("ProtoFrontiersKind::resp")?,
194 )),
195 Some(PeekResponse(resp)) => Ok(ComputeResponse::PeekResponse(
196 resp.id.into_rust_if_some("ProtoPeekResponseKind::id")?,
197 resp.resp.into_rust_if_some("ProtoPeekResponseKind::resp")?,
198 resp.otel_ctx.into(),
199 )),
200 Some(SubscribeResponse(resp)) => Ok(ComputeResponse::SubscribeResponse(
201 resp.id
202 .into_rust_if_some("ProtoSubscribeResponseKind::id")?,
203 resp.resp
204 .into_rust_if_some("ProtoSubscribeResponseKind::resp")?,
205 )),
206 Some(CopyToResponse(resp)) => Ok(ComputeResponse::CopyToResponse(
207 resp.id.into_rust_if_some("ProtoCopyToResponseKind::id")?,
208 resp.resp
209 .into_rust_if_some("ProtoCopyToResponseKind::resp")?,
210 )),
211 Some(Status(resp)) => Ok(ComputeResponse::Status(resp.into_rust()?)),
212 None => Err(TryFromProtoError::missing_field(
213 "ProtoComputeResponse::kind",
214 )),
215 }
216 }
217}
218
219impl Arbitrary for ComputeResponse<mz_repr::Timestamp> {
220 type Strategy = Union<BoxedStrategy<Self>>;
221 type Parameters = ();
222
223 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
224 Union::new(vec![
225 (any::<GlobalId>(), any::<FrontiersResponse>())
226 .prop_map(|(id, resp)| ComputeResponse::Frontiers(id, resp))
227 .boxed(),
228 (any_uuid(), any::<PeekResponse>())
229 .prop_map(|(id, resp)| {
230 ComputeResponse::PeekResponse(id, resp, OpenTelemetryContext::empty())
231 })
232 .boxed(),
233 (any::<GlobalId>(), any::<SubscribeResponse>())
234 .prop_map(|(id, resp)| ComputeResponse::SubscribeResponse(id, resp))
235 .boxed(),
236 any::<StatusResponse>()
237 .prop_map(ComputeResponse::Status)
238 .boxed(),
239 ])
240 }
241}
242
243#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
248pub struct FrontiersResponse<T = mz_repr::Timestamp> {
249 pub write_frontier: Option<Antichain<T>>,
256 pub input_frontier: Option<Antichain<T>>,
263 pub output_frontier: Option<Antichain<T>>,
277}
278
279impl<T> FrontiersResponse<T> {
280 pub fn has_updates(&self) -> bool {
282 self.write_frontier.is_some()
283 || self.input_frontier.is_some()
284 || self.output_frontier.is_some()
285 }
286}
287
288impl RustType<ProtoFrontiersResponse> for FrontiersResponse {
289 fn into_proto(&self) -> ProtoFrontiersResponse {
290 ProtoFrontiersResponse {
291 write_frontier: self.write_frontier.into_proto(),
292 input_frontier: self.input_frontier.into_proto(),
293 output_frontier: self.output_frontier.into_proto(),
294 }
295 }
296
297 fn from_proto(proto: ProtoFrontiersResponse) -> Result<Self, TryFromProtoError> {
298 Ok(Self {
299 write_frontier: proto.write_frontier.into_rust()?,
300 input_frontier: proto.input_frontier.into_rust()?,
301 output_frontier: proto.output_frontier.into_rust()?,
302 })
303 }
304}
305
306impl Arbitrary for FrontiersResponse {
307 type Strategy = BoxedStrategy<Self>;
308 type Parameters = ();
309
310 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
311 (any_antichain(), any_antichain(), any_antichain())
312 .prop_map(|(write, input, compute)| Self {
313 write_frontier: Some(write),
314 input_frontier: Some(input),
315 output_frontier: Some(compute),
316 })
317 .boxed()
318 }
319}
320
321#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
326pub enum PeekResponse {
327 Rows(RowCollection),
329 Stashed(Box<StashedPeekResponse>),
331 Error(String),
333 Canceled,
335}
336
337impl PeekResponse {
338 pub fn inline_byte_len(&self) -> usize {
340 match self {
341 Self::Rows(rows) => rows.byte_len(),
342 Self::Stashed(stashed) => stashed.inline_rows.byte_len(),
343 Self::Error(_) | Self::Canceled => 0,
344 }
345 }
346}
347
348impl RustType<ProtoPeekResponse> for PeekResponse {
349 fn into_proto(&self) -> ProtoPeekResponse {
350 use proto_peek_response::Kind::*;
351 ProtoPeekResponse {
352 kind: Some(match self {
353 PeekResponse::Rows(rows) => Rows(rows.into_proto()),
354 PeekResponse::Stashed(stashed) => Stashed(stashed.as_ref().into_proto()),
355 PeekResponse::Error(err) => proto_peek_response::Kind::Error(err.clone()),
356 PeekResponse::Canceled => Canceled(()),
357 }),
358 }
359 }
360
361 fn from_proto(proto: ProtoPeekResponse) -> Result<Self, TryFromProtoError> {
362 use proto_peek_response::Kind::*;
363 match proto.kind {
364 Some(Rows(rows)) => Ok(PeekResponse::Rows(rows.into_rust()?)),
365 Some(Stashed(stashed)) => Ok(PeekResponse::Stashed(Box::new(stashed.into_rust()?))),
366 Some(proto_peek_response::Kind::Error(err)) => Ok(PeekResponse::Error(err)),
367 Some(Canceled(())) => Ok(PeekResponse::Canceled),
368 None => Err(TryFromProtoError::missing_field("ProtoPeekResponse::kind")),
369 }
370 }
371}
372
373impl Arbitrary for PeekResponse {
374 type Strategy = Union<BoxedStrategy<Self>>;
375 type Parameters = ();
376
377 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
378 Union::new(vec![
379 proptest::collection::vec(
380 (
381 any::<Row>(),
382 (1..usize::MAX).prop_map(|u| NonZeroUsize::try_from(u).unwrap()),
383 ),
384 1..11,
385 )
386 .prop_map(|rows| PeekResponse::Rows(RowCollection::new(rows, &[])))
387 .boxed(),
388 ".*".prop_map(PeekResponse::Error).boxed(),
389 Just(PeekResponse::Canceled).boxed(),
390 ])
391 }
392}
393
394#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
396pub struct StashedPeekResponse {
397 pub num_rows_batches: u64,
402 pub encoded_size_bytes: usize,
404 pub relation_desc: RelationDesc,
406 pub shard_id: ShardId,
408 pub batches: Vec<ProtoBatch>,
411 pub inline_rows: RowCollection,
417}
418
419impl StashedPeekResponse {
420 pub fn num_rows(&self, offset: usize, limit: Option<usize>) -> usize {
423 let num_stashed_rows: usize = usize::cast_from(self.num_rows_batches);
424 let num_inline_rows = self.inline_rows.count(offset, limit);
425 let mut num_rows = num_stashed_rows + num_inline_rows;
426
427 num_rows = num_rows.saturating_sub(offset);
429
430 if let Some(limit) = limit {
432 num_rows = std::cmp::min(limit, num_rows);
433 }
434
435 num_rows
436 }
437
438 pub fn size_bytes(&self) -> usize {
440 let inline_size = self.inline_rows.byte_len();
441
442 self.encoded_size_bytes + inline_size
443 }
444}
445
446impl RustType<ProtoStashedPeekResponse> for StashedPeekResponse {
447 fn into_proto(&self) -> ProtoStashedPeekResponse {
448 ProtoStashedPeekResponse {
449 relation_desc: Some(self.relation_desc.into_proto()),
450 shard_id: self.shard_id.into_proto(),
451 batches: self.batches.clone(),
452 num_rows: self.num_rows_batches.into_proto(),
453 encoded_size_bytes: self.encoded_size_bytes.into_proto(),
454 inline_rows: Some(self.inline_rows.into_proto()),
455 }
456 }
457
458 fn from_proto(proto: ProtoStashedPeekResponse) -> Result<Self, TryFromProtoError> {
459 let shard_id: ShardId = proto
460 .shard_id
461 .into_rust()
462 .expect("valid transmittable shard_id");
463 Ok(StashedPeekResponse {
464 relation_desc: proto
465 .relation_desc
466 .into_rust_if_some("ProtoStashedPeekResponse::relation_desc")?,
467 shard_id,
468 batches: proto.batches,
469 num_rows_batches: proto.num_rows,
470 encoded_size_bytes: usize::cast_from(proto.encoded_size_bytes),
471 inline_rows: proto
472 .inline_rows
473 .into_rust_if_some("ProtoStashedPeekResponse::inline_rows")?,
474 })
475 }
476}
477
478#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
480pub enum CopyToResponse {
481 RowCount(u64),
483 Error(String),
485 Dropped,
487}
488
489impl RustType<ProtoCopyToResponse> for CopyToResponse {
490 fn into_proto(&self) -> ProtoCopyToResponse {
491 use proto_copy_to_response::Kind::*;
492 ProtoCopyToResponse {
493 kind: Some(match self {
494 CopyToResponse::RowCount(rows) => Rows(*rows),
495 CopyToResponse::Error(error) => Error(error.clone()),
496 CopyToResponse::Dropped => Dropped(()),
497 }),
498 }
499 }
500
501 fn from_proto(proto: ProtoCopyToResponse) -> Result<Self, TryFromProtoError> {
502 use proto_copy_to_response::Kind::*;
503 match proto.kind {
504 Some(Rows(rows)) => Ok(CopyToResponse::RowCount(rows)),
505 Some(Error(error)) => Ok(CopyToResponse::Error(error)),
506 Some(Dropped(())) => Ok(CopyToResponse::Dropped),
507 None => Err(TryFromProtoError::missing_field(
508 "ProtoCopyToResponse::kind",
509 )),
510 }
511 }
512}
513
514#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
516pub enum SubscribeResponse<T = mz_repr::Timestamp> {
517 Batch(SubscribeBatch<T>),
519 DroppedAt(Antichain<T>),
521}
522
523impl<T> SubscribeResponse<T> {
524 pub fn to_error_if_exceeds(&mut self, max_result_size: usize) {
526 if let SubscribeResponse::Batch(batch) = self {
527 batch.to_error_if_exceeds(max_result_size);
528 }
529 }
530}
531
532impl RustType<ProtoSubscribeResponse> for SubscribeResponse<mz_repr::Timestamp> {
533 fn into_proto(&self) -> ProtoSubscribeResponse {
534 use proto_subscribe_response::Kind::*;
535 ProtoSubscribeResponse {
536 kind: Some(match self {
537 SubscribeResponse::Batch(subscribe_batch) => Batch(subscribe_batch.into_proto()),
538 SubscribeResponse::DroppedAt(antichain) => DroppedAt(antichain.into_proto()),
539 }),
540 }
541 }
542
543 fn from_proto(proto: ProtoSubscribeResponse) -> Result<Self, TryFromProtoError> {
544 use proto_subscribe_response::Kind::*;
545 match proto.kind {
546 Some(Batch(subscribe_batch)) => {
547 Ok(SubscribeResponse::Batch(subscribe_batch.into_rust()?))
548 }
549 Some(DroppedAt(antichain)) => Ok(SubscribeResponse::DroppedAt(antichain.into_rust()?)),
550 None => Err(TryFromProtoError::missing_field(
551 "ProtoSubscribeResponse::kind",
552 )),
553 }
554 }
555}
556
557impl Arbitrary for SubscribeResponse<mz_repr::Timestamp> {
558 type Strategy = Union<BoxedStrategy<Self>>;
559 type Parameters = ();
560
561 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
562 Union::new(vec![
563 any::<SubscribeBatch<mz_repr::Timestamp>>()
564 .prop_map(SubscribeResponse::Batch)
565 .boxed(),
566 proptest::collection::vec(any::<mz_repr::Timestamp>(), 1..4)
567 .prop_map(|antichain| SubscribeResponse::DroppedAt(Antichain::from(antichain)))
568 .boxed(),
569 ])
570 }
571}
572
573#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
575pub struct SubscribeBatch<T = mz_repr::Timestamp> {
576 pub lower: Antichain<T>,
578 pub upper: Antichain<T>,
580 pub updates: Result<Vec<(T, Row, Diff)>, String>,
584}
585
586impl<T> SubscribeBatch<T> {
587 fn to_error_if_exceeds(&mut self, max_result_size: usize) {
589 use bytesize::ByteSize;
590 if let Ok(updates) = &self.updates {
591 let total_size: usize = updates
592 .iter()
593 .map(|(_time, row, _diff)| row.byte_len())
594 .sum();
595 if total_size > max_result_size {
596 self.updates = Err(format!(
597 "result exceeds max size of {}",
598 ByteSize::b(u64::cast_from(max_result_size))
599 ));
600 }
601 }
602 }
603}
604
605impl RustType<ProtoSubscribeBatch> for SubscribeBatch<mz_repr::Timestamp> {
606 fn into_proto(&self) -> ProtoSubscribeBatch {
607 use proto_subscribe_batch::ProtoUpdate;
608 ProtoSubscribeBatch {
609 lower: Some(self.lower.into_proto()),
610 upper: Some(self.upper.into_proto()),
611 updates: Some(proto_subscribe_batch::ProtoSubscribeBatchContents {
612 kind: match &self.updates {
613 Ok(updates) => {
614 let updates = updates
615 .iter()
616 .map(|(t, r, d)| ProtoUpdate {
617 timestamp: t.into(),
618 row: Some(r.into_proto()),
619 diff: d.into_proto(),
620 })
621 .collect();
622
623 Some(
624 proto_subscribe_batch::proto_subscribe_batch_contents::Kind::Updates(
625 proto_subscribe_batch::ProtoSubscribeUpdates { updates },
626 ),
627 )
628 }
629 Err(text) => Some(
630 proto_subscribe_batch::proto_subscribe_batch_contents::Kind::Error(
631 text.clone(),
632 ),
633 ),
634 },
635 }),
636 }
637 }
638
639 fn from_proto(proto: ProtoSubscribeBatch) -> Result<Self, TryFromProtoError> {
640 Ok(SubscribeBatch {
641 lower: proto.lower.into_rust_if_some("ProtoTailUpdate::lower")?,
642 upper: proto.upper.into_rust_if_some("ProtoTailUpdate::upper")?,
643 updates: match proto.updates.unwrap().kind {
644 Some(proto_subscribe_batch::proto_subscribe_batch_contents::Kind::Updates(
645 updates,
646 )) => Ok(updates
647 .updates
648 .into_iter()
649 .map(|update| {
650 Ok((
651 update.timestamp.into(),
652 update.row.into_rust_if_some("ProtoUpdate::row")?,
653 update.diff.into(),
654 ))
655 })
656 .collect::<Result<Vec<_>, TryFromProtoError>>()?),
657 Some(proto_subscribe_batch::proto_subscribe_batch_contents::Kind::Error(text)) => {
658 Err(text)
659 }
660 None => Err(TryFromProtoError::missing_field("ProtoPeekResponse::kind"))?,
661 },
662 })
663 }
664}
665
666impl Arbitrary for SubscribeBatch<mz_repr::Timestamp> {
667 type Strategy = BoxedStrategy<Self>;
668 type Parameters = ();
669
670 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
671 (
672 proptest::collection::vec(any::<mz_repr::Timestamp>(), 1..4),
673 proptest::collection::vec(any::<mz_repr::Timestamp>(), 1..4),
674 proptest::collection::vec(
675 (any::<mz_repr::Timestamp>(), any::<Row>(), any::<Diff>()),
676 1..4,
677 ),
678 )
679 .prop_map(|(lower, upper, updates)| SubscribeBatch {
680 lower: Antichain::from(lower),
681 upper: Antichain::from(upper),
682 updates: Ok(updates),
683 })
684 .boxed()
685 }
686}
687
688#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
690pub enum StatusResponse {
691 OperatorHydration(OperatorHydrationStatus),
693}
694
695impl RustType<ProtoStatusResponse> for StatusResponse {
696 fn into_proto(&self) -> ProtoStatusResponse {
697 use proto_status_response::Kind;
698
699 let kind = match self {
700 Self::OperatorHydration(status) => Kind::OperatorHydration(status.into_proto()),
701 };
702 ProtoStatusResponse { kind: Some(kind) }
703 }
704
705 fn from_proto(proto: ProtoStatusResponse) -> Result<Self, TryFromProtoError> {
706 use proto_status_response::Kind;
707
708 match proto.kind {
709 Some(Kind::OperatorHydration(status)) => {
710 Ok(Self::OperatorHydration(status.into_rust()?))
711 }
712 None => Err(TryFromProtoError::missing_field(
713 "ProtoStatusResponse::kind",
714 )),
715 }
716 }
717}
718
719#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
721pub struct OperatorHydrationStatus {
722 pub collection_id: GlobalId,
724 pub lir_id: LirId,
726 pub worker_id: usize,
728 pub hydrated: bool,
730}
731
732impl RustType<ProtoOperatorHydrationStatus> for OperatorHydrationStatus {
733 fn into_proto(&self) -> ProtoOperatorHydrationStatus {
734 ProtoOperatorHydrationStatus {
735 collection_id: Some(self.collection_id.into_proto()),
736 lir_id: self.lir_id.into_proto(),
737 worker_id: self.worker_id.into_proto(),
738 hydrated: self.hydrated.into_proto(),
739 }
740 }
741
742 fn from_proto(proto: ProtoOperatorHydrationStatus) -> Result<Self, TryFromProtoError> {
743 Ok(Self {
744 collection_id: proto
745 .collection_id
746 .into_rust_if_some("ProtoOperatorHydrationStatus::collection_id")?,
747 lir_id: proto.lir_id.into_rust()?,
748 worker_id: proto.worker_id.into_rust()?,
749 hydrated: proto.hydrated.into_rust()?,
750 })
751 }
752}
753
754#[cfg(test)]
755mod tests {
756 use mz_ore::assert_ok;
757 use mz_proto::protobuf_roundtrip;
758 use proptest::prelude::ProptestConfig;
759 use proptest::proptest;
760
761 use super::*;
762
763 #[mz_ore::test]
765 fn test_compute_response_size() {
766 assert_eq!(std::mem::size_of::<ComputeResponse>(), 120);
767 }
768
769 proptest! {
770 #![proptest_config(ProptestConfig::with_cases(32))]
771
772 #[mz_ore::test]
773 fn compute_response_protobuf_roundtrip(expect in any::<ComputeResponse<mz_repr::Timestamp>>() ) {
774 let actual = protobuf_roundtrip::<_, ProtoComputeResponse>(&expect);
775 assert_ok!(actual);
776 assert_eq!(actual.unwrap(), expect);
777 }
778 }
779}