1use std::num::NonZeroUsize;
13
14use mz_compute_types::plan::LirId;
15use mz_expr::row::RowCollection;
16use mz_ore::cast::CastFrom;
17use mz_ore::tracing::OpenTelemetryContext;
18use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError, any_uuid};
19use mz_repr::{Diff, GlobalId, Row};
20use mz_timely_util::progress::any_antichain;
21use proptest::prelude::{Arbitrary, any};
22use proptest::strategy::{BoxedStrategy, Just, Strategy, Union};
23use proptest_derive::Arbitrary;
24use serde::{Deserialize, Serialize};
25use timely::progress::frontier::Antichain;
26use uuid::Uuid;
27
28include!(concat!(
29 env!("OUT_DIR"),
30 "/mz_compute_client.protocol.response.rs"
31));
32
33#[derive(Clone, Debug, PartialEq)]
40pub enum ComputeResponse<T = mz_repr::Timestamp> {
41 Frontiers(GlobalId, FrontiersResponse<T>),
73
74 PeekResponse(Uuid, PeekResponse, OpenTelemetryContext),
91
92 SubscribeResponse(GlobalId, SubscribeResponse<T>),
125
126 CopyToResponse(GlobalId, CopyToResponse),
136
137 Status(StatusResponse),
150}
151
152impl RustType<ProtoComputeResponse> for ComputeResponse<mz_repr::Timestamp> {
153 fn into_proto(&self) -> ProtoComputeResponse {
154 use proto_compute_response::Kind::*;
155 use proto_compute_response::*;
156 ProtoComputeResponse {
157 kind: Some(match self {
158 ComputeResponse::Frontiers(id, resp) => Frontiers(ProtoFrontiersKind {
159 id: Some(id.into_proto()),
160 resp: Some(resp.into_proto()),
161 }),
162 ComputeResponse::PeekResponse(id, resp, otel_ctx) => {
163 PeekResponse(ProtoPeekResponseKind {
164 id: Some(id.into_proto()),
165 resp: Some(resp.into_proto()),
166 otel_ctx: otel_ctx.clone().into(),
167 })
168 }
169 ComputeResponse::SubscribeResponse(id, resp) => {
170 SubscribeResponse(ProtoSubscribeResponseKind {
171 id: Some(id.into_proto()),
172 resp: Some(resp.into_proto()),
173 })
174 }
175 ComputeResponse::CopyToResponse(id, resp) => {
176 CopyToResponse(ProtoCopyToResponseKind {
177 id: Some(id.into_proto()),
178 resp: Some(resp.into_proto()),
179 })
180 }
181 ComputeResponse::Status(resp) => Status(resp.into_proto()),
182 }),
183 }
184 }
185
186 fn from_proto(proto: ProtoComputeResponse) -> Result<Self, TryFromProtoError> {
187 use proto_compute_response::Kind::*;
188 match proto.kind {
189 Some(Frontiers(resp)) => Ok(ComputeResponse::Frontiers(
190 resp.id.into_rust_if_some("ProtoFrontiersKind::id")?,
191 resp.resp.into_rust_if_some("ProtoFrontiersKind::resp")?,
192 )),
193 Some(PeekResponse(resp)) => Ok(ComputeResponse::PeekResponse(
194 resp.id.into_rust_if_some("ProtoPeekResponseKind::id")?,
195 resp.resp.into_rust_if_some("ProtoPeekResponseKind::resp")?,
196 resp.otel_ctx.into(),
197 )),
198 Some(SubscribeResponse(resp)) => Ok(ComputeResponse::SubscribeResponse(
199 resp.id
200 .into_rust_if_some("ProtoSubscribeResponseKind::id")?,
201 resp.resp
202 .into_rust_if_some("ProtoSubscribeResponseKind::resp")?,
203 )),
204 Some(CopyToResponse(resp)) => Ok(ComputeResponse::CopyToResponse(
205 resp.id.into_rust_if_some("ProtoCopyToResponseKind::id")?,
206 resp.resp
207 .into_rust_if_some("ProtoCopyToResponseKind::resp")?,
208 )),
209 Some(Status(resp)) => Ok(ComputeResponse::Status(resp.into_rust()?)),
210 None => Err(TryFromProtoError::missing_field(
211 "ProtoComputeResponse::kind",
212 )),
213 }
214 }
215}
216
217impl Arbitrary for ComputeResponse<mz_repr::Timestamp> {
218 type Strategy = Union<BoxedStrategy<Self>>;
219 type Parameters = ();
220
221 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
222 Union::new(vec![
223 (any::<GlobalId>(), any::<FrontiersResponse>())
224 .prop_map(|(id, resp)| ComputeResponse::Frontiers(id, resp))
225 .boxed(),
226 (any_uuid(), any::<PeekResponse>())
227 .prop_map(|(id, resp)| {
228 ComputeResponse::PeekResponse(id, resp, OpenTelemetryContext::empty())
229 })
230 .boxed(),
231 (any::<GlobalId>(), any::<SubscribeResponse>())
232 .prop_map(|(id, resp)| ComputeResponse::SubscribeResponse(id, resp))
233 .boxed(),
234 any::<StatusResponse>()
235 .prop_map(ComputeResponse::Status)
236 .boxed(),
237 ])
238 }
239}
240
241#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
246pub struct FrontiersResponse<T = mz_repr::Timestamp> {
247 pub write_frontier: Option<Antichain<T>>,
254 pub input_frontier: Option<Antichain<T>>,
261 pub output_frontier: Option<Antichain<T>>,
275}
276
277impl<T> FrontiersResponse<T> {
278 pub fn has_updates(&self) -> bool {
280 self.write_frontier.is_some()
281 || self.input_frontier.is_some()
282 || self.output_frontier.is_some()
283 }
284}
285
286impl RustType<ProtoFrontiersResponse> for FrontiersResponse {
287 fn into_proto(&self) -> ProtoFrontiersResponse {
288 ProtoFrontiersResponse {
289 write_frontier: self.write_frontier.into_proto(),
290 input_frontier: self.input_frontier.into_proto(),
291 output_frontier: self.output_frontier.into_proto(),
292 }
293 }
294
295 fn from_proto(proto: ProtoFrontiersResponse) -> Result<Self, TryFromProtoError> {
296 Ok(Self {
297 write_frontier: proto.write_frontier.into_rust()?,
298 input_frontier: proto.input_frontier.into_rust()?,
299 output_frontier: proto.output_frontier.into_rust()?,
300 })
301 }
302}
303
304impl Arbitrary for FrontiersResponse {
305 type Strategy = BoxedStrategy<Self>;
306 type Parameters = ();
307
308 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
309 (any_antichain(), any_antichain(), any_antichain())
310 .prop_map(|(write, input, compute)| Self {
311 write_frontier: Some(write),
312 input_frontier: Some(input),
313 output_frontier: Some(compute),
314 })
315 .boxed()
316 }
317}
318
319#[derive(Clone, Debug, PartialEq)]
324pub enum PeekResponse {
325 Rows(RowCollection),
327 Error(String),
329 Canceled,
331}
332
333impl RustType<ProtoPeekResponse> for PeekResponse {
334 fn into_proto(&self) -> ProtoPeekResponse {
335 use proto_peek_response::Kind::*;
336 ProtoPeekResponse {
337 kind: Some(match self {
338 PeekResponse::Rows(rows) => Rows(rows.into_proto()),
339 PeekResponse::Error(err) => proto_peek_response::Kind::Error(err.clone()),
340 PeekResponse::Canceled => Canceled(()),
341 }),
342 }
343 }
344
345 fn from_proto(proto: ProtoPeekResponse) -> Result<Self, TryFromProtoError> {
346 use proto_peek_response::Kind::*;
347 match proto.kind {
348 Some(Rows(rows)) => Ok(PeekResponse::Rows(rows.into_rust()?)),
349 Some(proto_peek_response::Kind::Error(err)) => Ok(PeekResponse::Error(err)),
350 Some(Canceled(())) => Ok(PeekResponse::Canceled),
351 None => Err(TryFromProtoError::missing_field("ProtoPeekResponse::kind")),
352 }
353 }
354}
355
356impl Arbitrary for PeekResponse {
357 type Strategy = Union<BoxedStrategy<Self>>;
358 type Parameters = ();
359
360 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
361 Union::new(vec![
362 proptest::collection::vec(
363 (
364 any::<Row>(),
365 (1..usize::MAX).prop_map(|u| NonZeroUsize::try_from(u).unwrap()),
366 ),
367 1..11,
368 )
369 .prop_map(|rows| PeekResponse::Rows(RowCollection::new(rows, &[])))
370 .boxed(),
371 ".*".prop_map(PeekResponse::Error).boxed(),
372 Just(PeekResponse::Canceled).boxed(),
373 ])
374 }
375}
376
377#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
379pub enum CopyToResponse {
380 RowCount(u64),
382 Error(String),
384 Dropped,
386}
387
388impl RustType<ProtoCopyToResponse> for CopyToResponse {
389 fn into_proto(&self) -> ProtoCopyToResponse {
390 use proto_copy_to_response::Kind::*;
391 ProtoCopyToResponse {
392 kind: Some(match self {
393 CopyToResponse::RowCount(rows) => Rows(*rows),
394 CopyToResponse::Error(error) => Error(error.clone()),
395 CopyToResponse::Dropped => Dropped(()),
396 }),
397 }
398 }
399
400 fn from_proto(proto: ProtoCopyToResponse) -> Result<Self, TryFromProtoError> {
401 use proto_copy_to_response::Kind::*;
402 match proto.kind {
403 Some(Rows(rows)) => Ok(CopyToResponse::RowCount(rows)),
404 Some(Error(error)) => Ok(CopyToResponse::Error(error)),
405 Some(Dropped(())) => Ok(CopyToResponse::Dropped),
406 None => Err(TryFromProtoError::missing_field(
407 "ProtoCopyToResponse::kind",
408 )),
409 }
410 }
411}
412
413#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
415pub enum SubscribeResponse<T = mz_repr::Timestamp> {
416 Batch(SubscribeBatch<T>),
418 DroppedAt(Antichain<T>),
420}
421
422impl<T> SubscribeResponse<T> {
423 pub fn to_error_if_exceeds(&mut self, max_result_size: usize) {
425 if let SubscribeResponse::Batch(batch) = self {
426 batch.to_error_if_exceeds(max_result_size);
427 }
428 }
429}
430
431impl RustType<ProtoSubscribeResponse> for SubscribeResponse<mz_repr::Timestamp> {
432 fn into_proto(&self) -> ProtoSubscribeResponse {
433 use proto_subscribe_response::Kind::*;
434 ProtoSubscribeResponse {
435 kind: Some(match self {
436 SubscribeResponse::Batch(subscribe_batch) => Batch(subscribe_batch.into_proto()),
437 SubscribeResponse::DroppedAt(antichain) => DroppedAt(antichain.into_proto()),
438 }),
439 }
440 }
441
442 fn from_proto(proto: ProtoSubscribeResponse) -> Result<Self, TryFromProtoError> {
443 use proto_subscribe_response::Kind::*;
444 match proto.kind {
445 Some(Batch(subscribe_batch)) => {
446 Ok(SubscribeResponse::Batch(subscribe_batch.into_rust()?))
447 }
448 Some(DroppedAt(antichain)) => Ok(SubscribeResponse::DroppedAt(antichain.into_rust()?)),
449 None => Err(TryFromProtoError::missing_field(
450 "ProtoSubscribeResponse::kind",
451 )),
452 }
453 }
454}
455
456impl Arbitrary for SubscribeResponse<mz_repr::Timestamp> {
457 type Strategy = Union<BoxedStrategy<Self>>;
458 type Parameters = ();
459
460 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
461 Union::new(vec![
462 any::<SubscribeBatch<mz_repr::Timestamp>>()
463 .prop_map(SubscribeResponse::Batch)
464 .boxed(),
465 proptest::collection::vec(any::<mz_repr::Timestamp>(), 1..4)
466 .prop_map(|antichain| SubscribeResponse::DroppedAt(Antichain::from(antichain)))
467 .boxed(),
468 ])
469 }
470}
471
472#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
474pub struct SubscribeBatch<T = mz_repr::Timestamp> {
475 pub lower: Antichain<T>,
477 pub upper: Antichain<T>,
479 pub updates: Result<Vec<(T, Row, Diff)>, String>,
483}
484
485impl<T> SubscribeBatch<T> {
486 fn to_error_if_exceeds(&mut self, max_result_size: usize) {
488 use bytesize::ByteSize;
489 if let Ok(updates) = &self.updates {
490 let total_size: usize = updates
491 .iter()
492 .map(|(_time, row, _diff)| row.byte_len())
493 .sum();
494 if total_size > max_result_size {
495 self.updates = Err(format!(
496 "result exceeds max size of {}",
497 ByteSize::b(u64::cast_from(max_result_size))
498 ));
499 }
500 }
501 }
502}
503
504impl RustType<ProtoSubscribeBatch> for SubscribeBatch<mz_repr::Timestamp> {
505 fn into_proto(&self) -> ProtoSubscribeBatch {
506 use proto_subscribe_batch::ProtoUpdate;
507 ProtoSubscribeBatch {
508 lower: Some(self.lower.into_proto()),
509 upper: Some(self.upper.into_proto()),
510 updates: Some(proto_subscribe_batch::ProtoSubscribeBatchContents {
511 kind: match &self.updates {
512 Ok(updates) => {
513 let updates = updates
514 .iter()
515 .map(|(t, r, d)| ProtoUpdate {
516 timestamp: t.into(),
517 row: Some(r.into_proto()),
518 diff: d.into_proto(),
519 })
520 .collect();
521
522 Some(
523 proto_subscribe_batch::proto_subscribe_batch_contents::Kind::Updates(
524 proto_subscribe_batch::ProtoSubscribeUpdates { updates },
525 ),
526 )
527 }
528 Err(text) => Some(
529 proto_subscribe_batch::proto_subscribe_batch_contents::Kind::Error(
530 text.clone(),
531 ),
532 ),
533 },
534 }),
535 }
536 }
537
538 fn from_proto(proto: ProtoSubscribeBatch) -> Result<Self, TryFromProtoError> {
539 Ok(SubscribeBatch {
540 lower: proto.lower.into_rust_if_some("ProtoTailUpdate::lower")?,
541 upper: proto.upper.into_rust_if_some("ProtoTailUpdate::upper")?,
542 updates: match proto.updates.unwrap().kind {
543 Some(proto_subscribe_batch::proto_subscribe_batch_contents::Kind::Updates(
544 updates,
545 )) => Ok(updates
546 .updates
547 .into_iter()
548 .map(|update| {
549 Ok((
550 update.timestamp.into(),
551 update.row.into_rust_if_some("ProtoUpdate::row")?,
552 update.diff.into(),
553 ))
554 })
555 .collect::<Result<Vec<_>, TryFromProtoError>>()?),
556 Some(proto_subscribe_batch::proto_subscribe_batch_contents::Kind::Error(text)) => {
557 Err(text)
558 }
559 None => Err(TryFromProtoError::missing_field("ProtoPeekResponse::kind"))?,
560 },
561 })
562 }
563}
564
565impl Arbitrary for SubscribeBatch<mz_repr::Timestamp> {
566 type Strategy = BoxedStrategy<Self>;
567 type Parameters = ();
568
569 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
570 (
571 proptest::collection::vec(any::<mz_repr::Timestamp>(), 1..4),
572 proptest::collection::vec(any::<mz_repr::Timestamp>(), 1..4),
573 proptest::collection::vec(
574 (any::<mz_repr::Timestamp>(), any::<Row>(), any::<Diff>()),
575 1..4,
576 ),
577 )
578 .prop_map(|(lower, upper, updates)| SubscribeBatch {
579 lower: Antichain::from(lower),
580 upper: Antichain::from(upper),
581 updates: Ok(updates),
582 })
583 .boxed()
584 }
585}
586
587#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
589pub enum StatusResponse {
590 OperatorHydration(OperatorHydrationStatus),
592}
593
594impl RustType<ProtoStatusResponse> for StatusResponse {
595 fn into_proto(&self) -> ProtoStatusResponse {
596 use proto_status_response::Kind;
597
598 let kind = match self {
599 Self::OperatorHydration(status) => Kind::OperatorHydration(status.into_proto()),
600 };
601 ProtoStatusResponse { kind: Some(kind) }
602 }
603
604 fn from_proto(proto: ProtoStatusResponse) -> Result<Self, TryFromProtoError> {
605 use proto_status_response::Kind;
606
607 match proto.kind {
608 Some(Kind::OperatorHydration(status)) => {
609 Ok(Self::OperatorHydration(status.into_rust()?))
610 }
611 None => Err(TryFromProtoError::missing_field(
612 "ProtoStatusResponse::kind",
613 )),
614 }
615 }
616}
617
618#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
620pub struct OperatorHydrationStatus {
621 pub collection_id: GlobalId,
623 pub lir_id: LirId,
625 pub worker_id: usize,
627 pub hydrated: bool,
629}
630
631impl RustType<ProtoOperatorHydrationStatus> for OperatorHydrationStatus {
632 fn into_proto(&self) -> ProtoOperatorHydrationStatus {
633 ProtoOperatorHydrationStatus {
634 collection_id: Some(self.collection_id.into_proto()),
635 lir_id: self.lir_id.into_proto(),
636 worker_id: self.worker_id.into_proto(),
637 hydrated: self.hydrated.into_proto(),
638 }
639 }
640
641 fn from_proto(proto: ProtoOperatorHydrationStatus) -> Result<Self, TryFromProtoError> {
642 Ok(Self {
643 collection_id: proto
644 .collection_id
645 .into_rust_if_some("ProtoOperatorHydrationStatus::collection_id")?,
646 lir_id: proto.lir_id.into_rust()?,
647 worker_id: proto.worker_id.into_rust()?,
648 hydrated: proto.hydrated.into_rust()?,
649 })
650 }
651}
652
653#[cfg(test)]
654mod tests {
655 use mz_ore::assert_ok;
656 use mz_proto::protobuf_roundtrip;
657 use proptest::prelude::ProptestConfig;
658 use proptest::proptest;
659
660 use super::*;
661
662 #[mz_ore::test]
664 fn test_compute_response_size() {
665 assert_eq!(std::mem::size_of::<ComputeResponse>(), 120);
666 }
667
668 proptest! {
669 #![proptest_config(ProptestConfig::with_cases(32))]
670
671 #[mz_ore::test]
672 fn compute_response_protobuf_roundtrip(expect in any::<ComputeResponse<mz_repr::Timestamp>>() ) {
673 let actual = protobuf_roundtrip::<_, ProtoComputeResponse>(&expect);
674 assert_ok!(actual);
675 assert_eq!(actual.unwrap(), expect);
676 }
677 }
678}