1#![allow(missing_docs)]
11#![allow(clippy::as_conversions, clippy::clone_on_ref_ptr)]
14
15use std::collections::{BTreeMap, BTreeSet};
18use std::fmt::Debug;
19use std::iter;
20
21use async_trait::async_trait;
22use differential_dataflow::difference::Semigroup;
23use differential_dataflow::lattice::Lattice;
24use mz_cluster_client::ReplicaId;
25use mz_cluster_client::client::TryIntoProtocolNonce;
26use mz_ore::assert_none;
27use mz_persist_client::batch::{BatchBuilder, ProtoBatch};
28use mz_persist_client::write::WriteHandle;
29use mz_persist_types::{Codec, Codec64, StepForward};
30use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
31use mz_repr::{Diff, GlobalId, Row, TimestampManipulation};
32use mz_service::client::{GenericClient, Partitionable, PartitionedState};
33use mz_service::grpc::{GrpcClient, GrpcServer, ProtoServiceTypes, ResponseStream};
34use mz_storage_types::controller::CollectionMetadata;
35use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
36use mz_storage_types::parameters::StorageParameters;
37use mz_storage_types::sinks::StorageSinkDesc;
38use mz_storage_types::sources::IngestionDescription;
39use mz_timely_util::progress::any_antichain;
40use proptest::prelude::{Arbitrary, any};
41use proptest::strategy::{BoxedStrategy, Strategy, Union};
42use serde::{Deserialize, Serialize};
43use smallvec::SmallVec;
44use timely::PartialOrder;
45use timely::progress::Timestamp;
46use timely::progress::frontier::{Antichain, MutableAntichain};
47use tonic::{Request, Status as TonicStatus, Streaming};
48use uuid::Uuid;
49
50use crate::client::proto_storage_server::ProtoStorage;
51use crate::metrics::ReplicaMetrics;
52use crate::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate};
53
54include!(concat!(env!("OUT_DIR"), "/mz_storage_client.client.rs"));
55
56pub trait StorageClient<T = mz_repr::Timestamp>:
58 GenericClient<StorageCommand<T>, StorageResponse<T>>
59{
60}
61
62impl<C, T> StorageClient<T> for C where C: GenericClient<StorageCommand<T>, StorageResponse<T>> {}
63
64#[async_trait]
65impl<T: Send> GenericClient<StorageCommand<T>, StorageResponse<T>> for Box<dyn StorageClient<T>> {
66 async fn send(&mut self, cmd: StorageCommand<T>) -> Result<(), anyhow::Error> {
67 (**self).send(cmd).await
68 }
69
70 async fn recv(&mut self) -> Result<Option<StorageResponse<T>>, anyhow::Error> {
76 (**self).recv().await
78 }
79}
80
81#[derive(Debug, Clone)]
82pub enum StorageProtoServiceTypes {}
83
84impl ProtoServiceTypes for StorageProtoServiceTypes {
85 type PC = ProtoStorageCommand;
86 type PR = ProtoStorageResponse;
87 type STATS = ReplicaMetrics;
88 const URL: &'static str = "/mz_storage_client.client.ProtoStorage/CommandResponseStream";
89}
90
91pub type StorageGrpcClient = GrpcClient<StorageProtoServiceTypes>;
92
93#[async_trait]
94impl<F, G> ProtoStorage for GrpcServer<F>
95where
96 F: Fn() -> G + Send + Sync + 'static,
97 G: StorageClient + 'static,
98{
99 type CommandResponseStreamStream = ResponseStream<ProtoStorageResponse>;
100
101 async fn command_response_stream(
102 &self,
103 request: Request<Streaming<ProtoStorageCommand>>,
104 ) -> Result<tonic::Response<Self::CommandResponseStreamStream>, TonicStatus> {
105 self.forward_bidi_stream(request).await
106 }
107}
108
109#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
111pub enum StorageCommand<T = mz_repr::Timestamp> {
112 Hello {
114 nonce: Uuid,
115 },
116 InitializationComplete,
119 AllowWrites,
126 UpdateConfiguration(Box<StorageParameters>),
128 RunIngestion(Box<RunIngestionCommand>),
130 AllowCompaction(GlobalId, Antichain<T>),
134 RunSink(Box<RunSinkCommand<T>>),
135 RunOneshotIngestion(Box<RunOneshotIngestion>),
141 CancelOneshotIngestion(Uuid),
150}
151
152impl<T> StorageCommand<T> {
153 pub fn installs_objects(&self) -> bool {
155 use StorageCommand::*;
156 match self {
157 Hello { .. }
158 | InitializationComplete
159 | AllowWrites
160 | UpdateConfiguration(_)
161 | AllowCompaction(_, _)
162 | CancelOneshotIngestion { .. } => false,
163 RunIngestion(_) | RunSink(_) | RunOneshotIngestion(_) => true,
167 }
168 }
169}
170
171#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
173pub struct RunIngestionCommand {
174 pub id: GlobalId,
176 pub description: IngestionDescription<CollectionMetadata>,
179}
180
181impl Arbitrary for RunIngestionCommand {
182 type Strategy = BoxedStrategy<Self>;
183 type Parameters = ();
184
185 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
186 (
187 any::<GlobalId>(),
188 any::<IngestionDescription<CollectionMetadata>>(),
189 )
190 .prop_map(|(id, description)| Self { id, description })
191 .boxed()
192 }
193}
194
195impl RustType<ProtoRunIngestionCommand> for RunIngestionCommand {
196 fn into_proto(&self) -> ProtoRunIngestionCommand {
197 ProtoRunIngestionCommand {
198 id: Some(self.id.into_proto()),
199 description: Some(self.description.into_proto()),
200 }
201 }
202
203 fn from_proto(proto: ProtoRunIngestionCommand) -> Result<Self, TryFromProtoError> {
204 Ok(RunIngestionCommand {
205 id: proto.id.into_rust_if_some("ProtoRunIngestionCommand::id")?,
206 description: proto
207 .description
208 .into_rust_if_some("ProtoRunIngestionCommand::description")?,
209 })
210 }
211}
212
213#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
215pub struct RunOneshotIngestion {
216 pub ingestion_id: uuid::Uuid,
218 pub collection_id: GlobalId,
220 pub collection_meta: CollectionMetadata,
222 pub request: OneshotIngestionRequest,
224}
225
226impl RustType<ProtoRunOneshotIngestion> for RunOneshotIngestion {
227 fn into_proto(&self) -> ProtoRunOneshotIngestion {
228 ProtoRunOneshotIngestion {
229 ingestion_id: Some(self.ingestion_id.into_proto()),
230 collection_id: Some(self.collection_id.into_proto()),
231 storage_metadata: Some(self.collection_meta.into_proto()),
232 request: Some(self.request.into_proto()),
233 }
234 }
235
236 fn from_proto(proto: ProtoRunOneshotIngestion) -> Result<Self, TryFromProtoError> {
237 Ok(RunOneshotIngestion {
238 ingestion_id: proto
239 .ingestion_id
240 .into_rust_if_some("ProtoRunOneshotIngestion::ingestion_id")?,
241 collection_id: proto
242 .collection_id
243 .into_rust_if_some("ProtoRunOneshotIngestion::collection_id")?,
244 collection_meta: proto
245 .storage_metadata
246 .into_rust_if_some("ProtoRunOneshotIngestion::storage_metadata")?,
247 request: proto
248 .request
249 .into_rust_if_some("ProtoRunOneshotIngestion::request")?,
250 })
251 }
252}
253
254impl RustType<ProtoRunSinkCommand> for RunSinkCommand<mz_repr::Timestamp> {
255 fn into_proto(&self) -> ProtoRunSinkCommand {
256 ProtoRunSinkCommand {
257 id: Some(self.id.into_proto()),
258 description: Some(self.description.into_proto()),
259 }
260 }
261
262 fn from_proto(proto: ProtoRunSinkCommand) -> Result<Self, TryFromProtoError> {
263 Ok(RunSinkCommand {
264 id: proto.id.into_rust_if_some("ProtoRunSinkCommand::id")?,
265 description: proto
266 .description
267 .into_rust_if_some("ProtoRunSinkCommand::description")?,
268 })
269 }
270}
271
272#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
274pub struct RunSinkCommand<T> {
275 pub id: GlobalId,
276 pub description: StorageSinkDesc<CollectionMetadata, T>,
277}
278
279impl Arbitrary for RunSinkCommand<mz_repr::Timestamp> {
280 type Strategy = BoxedStrategy<Self>;
281 type Parameters = ();
282
283 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
284 (
285 any::<GlobalId>(),
286 any::<StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>>(),
287 )
288 .prop_map(|(id, description)| Self { id, description })
289 .boxed()
290 }
291}
292
293impl RustType<ProtoStorageCommand> for StorageCommand<mz_repr::Timestamp> {
294 fn into_proto(&self) -> ProtoStorageCommand {
295 use proto_storage_command::Kind::*;
296 use proto_storage_command::*;
297 ProtoStorageCommand {
298 kind: Some(match self {
299 StorageCommand::Hello { nonce } => Hello(ProtoHello {
300 nonce: Some(nonce.into_proto()),
301 }),
302 StorageCommand::InitializationComplete => InitializationComplete(()),
303 StorageCommand::AllowWrites => AllowWrites(()),
304 StorageCommand::UpdateConfiguration(params) => {
305 UpdateConfiguration(*params.into_proto())
306 }
307 StorageCommand::AllowCompaction(id, frontier) => AllowCompaction(ProtoCompaction {
308 id: Some(id.into_proto()),
309 frontier: Some(frontier.into_proto()),
310 }),
311 StorageCommand::RunIngestion(ingestion) => RunIngestion(*ingestion.into_proto()),
312 StorageCommand::RunSink(sink) => RunSink(*sink.into_proto()),
313 StorageCommand::RunOneshotIngestion(ingestion) => {
314 RunOneshotIngestion(*ingestion.into_proto())
315 }
316 StorageCommand::CancelOneshotIngestion(uuid) => {
317 CancelOneshotIngestion(uuid.into_proto())
318 }
319 }),
320 }
321 }
322
323 fn from_proto(proto: ProtoStorageCommand) -> Result<Self, TryFromProtoError> {
324 use proto_storage_command::Kind::*;
325 use proto_storage_command::*;
326 match proto.kind {
327 Some(Hello(ProtoHello { nonce })) => Ok(StorageCommand::Hello {
328 nonce: nonce.into_rust_if_some("ProtoHello::nonce")?,
329 }),
330 Some(InitializationComplete(())) => Ok(StorageCommand::InitializationComplete),
331 Some(AllowWrites(())) => Ok(StorageCommand::AllowWrites),
332 Some(UpdateConfiguration(params)) => {
333 let params = Box::new(params.into_rust()?);
334 Ok(StorageCommand::UpdateConfiguration(params))
335 }
336 Some(RunIngestion(ingestion)) => {
337 let ingestion = Box::new(ingestion.into_rust()?);
338 Ok(StorageCommand::RunIngestion(ingestion))
339 }
340 Some(AllowCompaction(ProtoCompaction { id, frontier })) => {
341 Ok(StorageCommand::AllowCompaction(
342 id.into_rust_if_some("ProtoCompaction::id")?,
343 frontier.into_rust_if_some("ProtoCompaction::frontier")?,
344 ))
345 }
346 Some(RunSink(sink)) => {
347 let sink = Box::new(sink.into_rust()?);
348 Ok(StorageCommand::RunSink(sink))
349 }
350 Some(RunOneshotIngestion(ingestion)) => {
351 let ingestion = Box::new(ingestion.into_rust()?);
352 Ok(StorageCommand::RunOneshotIngestion(ingestion))
353 }
354 Some(CancelOneshotIngestion(uuid)) => {
355 Ok(StorageCommand::CancelOneshotIngestion(uuid.into_rust()?))
356 }
357 None => Err(TryFromProtoError::missing_field(
358 "ProtoStorageCommand::kind",
359 )),
360 }
361 }
362}
363
364impl Arbitrary for StorageCommand<mz_repr::Timestamp> {
365 type Strategy = Union<BoxedStrategy<Self>>;
366 type Parameters = ();
367
368 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
369 Union::new(vec![
370 any::<u128>()
371 .prop_map(|n| StorageCommand::Hello {
372 nonce: Uuid::from_u128(n),
373 })
374 .boxed(),
375 any::<RunIngestionCommand>()
376 .prop_map(Box::new)
377 .prop_map(StorageCommand::RunIngestion)
378 .boxed(),
379 any::<RunSinkCommand<mz_repr::Timestamp>>()
380 .prop_map(Box::new)
381 .prop_map(StorageCommand::RunSink)
382 .boxed(),
383 (any::<GlobalId>(), any_antichain())
384 .prop_map(|(id, frontier)| StorageCommand::AllowCompaction(id, frontier))
385 .boxed(),
386 ])
387 }
388}
389
390#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
392pub enum Status {
393 Starting,
394 Running,
395 Paused,
396 Stalled,
397 Ceased,
400 Dropped,
401}
402
403impl std::str::FromStr for Status {
404 type Err = anyhow::Error;
405 fn from_str(s: &str) -> Result<Self, Self::Err> {
407 Ok(match s {
408 "starting" => Status::Starting,
409 "running" => Status::Running,
410 "paused" => Status::Paused,
411 "stalled" => Status::Stalled,
412 "ceased" => Status::Ceased,
413 "dropped" => Status::Dropped,
414 s => return Err(anyhow::anyhow!("{} is not a valid status", s)),
415 })
416 }
417}
418
419impl Status {
420 pub fn to_str(&self) -> &'static str {
422 match self {
423 Status::Starting => "starting",
424 Status::Running => "running",
425 Status::Paused => "paused",
426 Status::Stalled => "stalled",
427 Status::Ceased => "ceased",
428 Status::Dropped => "dropped",
429 }
430 }
431
432 pub fn superseded_by(self, new: Status) -> bool {
435 match (self, new) {
436 (_, Status::Dropped) => true,
437 (Status::Dropped, _) => false,
438 (Status::Paused, Status::Paused) => false,
440 _ => true,
443 }
444 }
445}
446
447#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
453pub struct StatusUpdate {
454 pub id: GlobalId,
455 pub status: Status,
456 pub timestamp: chrono::DateTime<chrono::Utc>,
457 pub error: Option<String>,
458 pub hints: BTreeSet<String>,
459 pub namespaced_errors: BTreeMap<String, String>,
460 pub replica_id: Option<ReplicaId>,
461}
462
463impl StatusUpdate {
464 pub fn new(
465 id: GlobalId,
466 timestamp: chrono::DateTime<chrono::Utc>,
467 status: Status,
468 ) -> StatusUpdate {
469 StatusUpdate {
470 id,
471 timestamp,
472 status,
473 error: None,
474 hints: Default::default(),
475 namespaced_errors: Default::default(),
476 replica_id: None,
477 }
478 }
479}
480
481impl From<StatusUpdate> for Row {
482 fn from(update: StatusUpdate) -> Self {
483 use mz_repr::Datum;
484
485 let timestamp = Datum::TimestampTz(update.timestamp.try_into().expect("must fit"));
486 let id = update.id.to_string();
487 let id = Datum::String(&id);
488 let status = Datum::String(update.status.to_str());
489 let error = update.error.as_deref().into();
490
491 let mut row = Row::default();
492 let mut packer = row.packer();
493 packer.extend([timestamp, id, status, error]);
494
495 if !update.hints.is_empty() || !update.namespaced_errors.is_empty() {
496 packer.push_dict_with(|dict_packer| {
497 if !update.hints.is_empty() {
500 dict_packer.push(Datum::String("hints"));
501 dict_packer.push_list(update.hints.iter().map(|s| Datum::String(s)));
502 }
503 if !update.namespaced_errors.is_empty() {
504 dict_packer.push(Datum::String("namespaced"));
505 dict_packer.push_dict(
506 update
507 .namespaced_errors
508 .iter()
509 .map(|(k, v)| (k.as_str(), Datum::String(v))),
510 );
511 }
512 });
513 } else {
514 packer.push(Datum::Null);
515 }
516
517 match update.replica_id {
518 Some(id) => packer.push(Datum::String(&id.to_string())),
519 None => packer.push(Datum::Null),
520 }
521
522 row
523 }
524}
525
526impl RustType<proto_storage_response::ProtoStatus> for Status {
527 fn into_proto(&self) -> proto_storage_response::ProtoStatus {
528 use proto_storage_response::proto_status::*;
529
530 proto_storage_response::ProtoStatus {
531 kind: Some(match self {
532 Status::Starting => Kind::Starting(()),
533 Status::Running => Kind::Running(()),
534 Status::Paused => Kind::Paused(()),
535 Status::Stalled => Kind::Stalled(()),
536 Status::Ceased => Kind::Ceased(()),
537 Status::Dropped => Kind::Dropped(()),
538 }),
539 }
540 }
541
542 fn from_proto(proto: proto_storage_response::ProtoStatus) -> Result<Self, TryFromProtoError> {
543 use proto_storage_response::proto_status::*;
544 let kind = proto
545 .kind
546 .ok_or_else(|| TryFromProtoError::missing_field("ProtoStatus::kind"))?;
547
548 Ok(match kind {
549 Kind::Starting(()) => Status::Starting,
550 Kind::Running(()) => Status::Running,
551 Kind::Paused(()) => Status::Paused,
552 Kind::Stalled(()) => Status::Stalled,
553 Kind::Ceased(()) => Status::Ceased,
554 Kind::Dropped(()) => Status::Dropped,
555 })
556 }
557}
558
559impl RustType<proto_storage_response::ProtoStatusUpdate> for StatusUpdate {
560 fn into_proto(&self) -> proto_storage_response::ProtoStatusUpdate {
561 proto_storage_response::ProtoStatusUpdate {
562 id: Some(self.id.into_proto()),
563 status: Some(self.status.into_proto()),
564 timestamp: Some(self.timestamp.into_proto()),
565 error: self.error.clone(),
566 hints: self.hints.iter().cloned().collect(),
567 namespaced_errors: self.namespaced_errors.clone(),
568 replica_id: self.replica_id.map(|id| id.to_string().into_proto()),
569 }
570 }
571
572 fn from_proto(
573 proto: proto_storage_response::ProtoStatusUpdate,
574 ) -> Result<Self, TryFromProtoError> {
575 Ok(StatusUpdate {
576 id: proto.id.into_rust_if_some("ProtoStatusUpdate::id")?,
577 timestamp: proto
578 .timestamp
579 .into_rust_if_some("ProtoStatusUpdate::timestamp")?,
580 status: proto
581 .status
582 .into_rust_if_some("ProtoStatusUpdate::status")?,
583 error: proto.error,
584 hints: proto.hints.into_iter().collect(),
585 namespaced_errors: proto.namespaced_errors,
586 replica_id: proto
587 .replica_id
588 .map(|replica_id: String| replica_id.parse().expect("must be a valid replica id")),
589 })
590 }
591}
592
593pub enum AppendOnlyUpdate {
595 Row((Row, Diff)),
596 Status(StatusUpdate),
597}
598
599impl AppendOnlyUpdate {
600 pub fn into_row(self) -> (Row, Diff) {
601 match self {
602 AppendOnlyUpdate::Row((row, diff)) => (row, diff),
603 AppendOnlyUpdate::Status(status) => (Row::from(status), Diff::ONE),
604 }
605 }
606}
607
608impl From<(Row, Diff)> for AppendOnlyUpdate {
609 fn from((row, diff): (Row, Diff)) -> Self {
610 Self::Row((row, diff))
611 }
612}
613
614impl From<StatusUpdate> for AppendOnlyUpdate {
615 fn from(update: StatusUpdate) -> Self {
616 Self::Status(update)
617 }
618}
619
620#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
622pub enum StorageResponse<T = mz_repr::Timestamp> {
623 FrontierUpper(GlobalId, Antichain<T>),
625 DroppedId(GlobalId),
627 StagedBatches(BTreeMap<uuid::Uuid, Vec<Result<ProtoBatch, String>>>),
629 StatisticsUpdates(Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>),
631 StatusUpdate(StatusUpdate),
634}
635
636impl RustType<ProtoStorageResponse> for StorageResponse<mz_repr::Timestamp> {
637 fn into_proto(&self) -> ProtoStorageResponse {
638 use proto_storage_response::Kind::*;
639 use proto_storage_response::{
640 ProtoDroppedId, ProtoFrontierUpper, ProtoStagedBatches, ProtoStatisticsUpdates,
641 };
642 ProtoStorageResponse {
643 kind: Some(match self {
644 StorageResponse::FrontierUpper(id, upper) => FrontierUpper(ProtoFrontierUpper {
645 id: Some(id.into_proto()),
646 upper: Some(upper.into_proto()),
647 }),
648 StorageResponse::DroppedId(id) => DroppedId(ProtoDroppedId {
649 id: Some(id.into_proto()),
650 }),
651 StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
652 Stats(ProtoStatisticsUpdates {
653 source_updates: source_stats
654 .iter()
655 .map(|update| update.into_proto())
656 .collect(),
657 sink_updates: sink_stats
658 .iter()
659 .map(|update| update.into_proto())
660 .collect(),
661 })
662 }
663 StorageResponse::StatusUpdate(update) => StatusUpdate(update.into_proto()),
664 StorageResponse::StagedBatches(staged) => {
665 let batches = staged
666 .into_iter()
667 .map(|(collection_id, batches)| {
668 let batches = batches
669 .into_iter()
670 .map(|result| {
671 use proto_storage_response::proto_staged_batches::batch_result::Value;
672 let value = match result {
673 Ok(batch) => Value::Batch(batch.clone()),
674 Err(err) => Value::Error(err.clone()),
675 };
676 proto_storage_response::proto_staged_batches::BatchResult { value: Some(value) }
677 })
678 .collect();
679 proto_storage_response::proto_staged_batches::Inner {
680 id: Some(collection_id.into_proto()),
681 batches,
682 }
683 })
684 .collect();
685 StagedBatches(ProtoStagedBatches { batches })
686 }
687 }),
688 }
689 }
690
691 fn from_proto(proto: ProtoStorageResponse) -> Result<Self, TryFromProtoError> {
692 use proto_storage_response::Kind::*;
693 use proto_storage_response::{ProtoDroppedId, ProtoFrontierUpper};
694 match proto.kind {
695 Some(DroppedId(ProtoDroppedId { id })) => Ok(StorageResponse::DroppedId(
696 id.into_rust_if_some("ProtoDroppedId::id")?,
697 )),
698 Some(FrontierUpper(ProtoFrontierUpper { id, upper })) => {
699 Ok(StorageResponse::FrontierUpper(
700 id.into_rust_if_some("ProtoFrontierUpper::id")?,
701 upper.into_rust_if_some("ProtoFrontierUpper::upper")?,
702 ))
703 }
704 Some(Stats(stats)) => Ok(StorageResponse::StatisticsUpdates(
705 stats
706 .source_updates
707 .into_iter()
708 .map(|update| update.into_rust())
709 .collect::<Result<Vec<_>, TryFromProtoError>>()?,
710 stats
711 .sink_updates
712 .into_iter()
713 .map(|update| update.into_rust())
714 .collect::<Result<Vec<_>, TryFromProtoError>>()?,
715 )),
716 Some(StatusUpdate(update)) => Ok(StorageResponse::StatusUpdate(update.into_rust()?)),
717 Some(StagedBatches(staged)) => {
718 let batches: BTreeMap<_, _> = staged
719 .batches
720 .into_iter()
721 .map(|inner| {
722 let id = inner
723 .id
724 .into_rust_if_some("ProtoStagedBatches::Inner::id")?;
725
726 let mut batches = Vec::with_capacity(inner.batches.len());
727 for maybe_batch in inner.batches {
728 use proto_storage_response::proto_staged_batches::batch_result::Value;
729
730 let value = maybe_batch.value.ok_or_else(|| {
731 TryFromProtoError::missing_field("BatchResult::value")
732 })?;
733 let batch = match value {
734 Value::Batch(batch) => Ok(batch),
735 Value::Error(err) => Err(err),
736 };
737 batches.push(batch);
738 }
739
740 Ok::<_, TryFromProtoError>((id, batches))
741 })
742 .collect::<Result<_, _>>()?;
743
744 Ok(StorageResponse::StagedBatches(batches))
745 }
746 None => Err(TryFromProtoError::missing_field(
747 "ProtoStorageResponse::kind",
748 )),
749 }
750 }
751}
752
753impl Arbitrary for StorageResponse<mz_repr::Timestamp> {
754 type Strategy = Union<BoxedStrategy<Self>>;
755 type Parameters = ();
756
757 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
758 Union::new(vec![
760 (any::<GlobalId>(), any_antichain())
761 .prop_map(|(id, upper)| StorageResponse::FrontierUpper(id, upper))
762 .boxed(),
763 ])
764 }
765}
766
767#[derive(Debug)]
772pub struct PartitionedStorageState<T> {
773 parts: usize,
775 uppers: BTreeMap<GlobalId, (MutableAntichain<T>, Vec<Option<Antichain<T>>>)>,
778 oneshot_source_responses:
780 BTreeMap<uuid::Uuid, BTreeMap<usize, Vec<Result<ProtoBatch, String>>>>,
781}
782
783impl<T> Partitionable<StorageCommand<T>, StorageResponse<T>>
784 for (StorageCommand<T>, StorageResponse<T>)
785where
786 T: timely::progress::Timestamp + Lattice,
787{
788 type PartitionedState = PartitionedStorageState<T>;
789
790 fn new(parts: usize) -> PartitionedStorageState<T> {
791 PartitionedStorageState {
792 parts,
793 uppers: BTreeMap::new(),
794 oneshot_source_responses: BTreeMap::new(),
795 }
796 }
797}
798
799impl<T> PartitionedStorageState<T>
800where
801 T: timely::progress::Timestamp,
802{
803 fn observe_command(&mut self, command: &StorageCommand<T>) {
804 let _ = match command {
811 StorageCommand::Hello { .. } => {}
812 StorageCommand::RunIngestion(ingestion) => {
813 self.insert_new_uppers(ingestion.description.collection_ids());
814 }
815 StorageCommand::RunSink(export) => {
816 self.insert_new_uppers([export.id]);
817 }
818 StorageCommand::InitializationComplete
819 | StorageCommand::AllowWrites
820 | StorageCommand::UpdateConfiguration(_)
821 | StorageCommand::AllowCompaction(_, _)
822 | StorageCommand::RunOneshotIngestion(_)
823 | StorageCommand::CancelOneshotIngestion { .. } => {}
824 };
825 }
826
827 fn insert_new_uppers<I: IntoIterator<Item = GlobalId>>(&mut self, ids: I) {
833 for id in ids {
834 self.uppers.entry(id).or_insert_with(|| {
835 let mut frontier = MutableAntichain::new();
836 #[allow(clippy::as_conversions)]
839 frontier.update_iter(iter::once((T::minimum(), self.parts as i64)));
840 let part_frontiers = vec![Some(Antichain::from_elem(T::minimum())); self.parts];
841
842 (frontier, part_frontiers)
843 });
844 }
845 }
846}
847
848impl<T> PartitionedState<StorageCommand<T>, StorageResponse<T>> for PartitionedStorageState<T>
849where
850 T: timely::progress::Timestamp + Lattice,
851{
852 fn split_command(&mut self, command: StorageCommand<T>) -> Vec<Option<StorageCommand<T>>> {
853 self.observe_command(&command);
854
855 vec![Some(command); self.parts]
858 }
859
860 fn absorb_response(
861 &mut self,
862 shard_id: usize,
863 response: StorageResponse<T>,
864 ) -> Option<Result<StorageResponse<T>, anyhow::Error>> {
865 match response {
866 StorageResponse::FrontierUpper(id, new_shard_upper) => {
868 let (frontier, shard_frontiers) = match self.uppers.get_mut(&id) {
869 Some(value) => value,
870 None => panic!("Reference to absent collection: {id}"),
871 };
872 let old_upper = frontier.frontier().to_owned();
873 let shard_upper = match &mut shard_frontiers[shard_id] {
874 Some(shard_upper) => shard_upper,
875 None => panic!("Reference to absent shard {shard_id} for collection {id}"),
876 };
877 frontier.update_iter(shard_upper.iter().map(|t| (t.clone(), -1)));
878 frontier.update_iter(new_shard_upper.iter().map(|t| (t.clone(), 1)));
879 shard_upper.join_assign(&new_shard_upper);
880
881 let new_upper = frontier.frontier();
882 if PartialOrder::less_than(&old_upper.borrow(), &new_upper) {
883 Some(Ok(StorageResponse::FrontierUpper(id, new_upper.to_owned())))
884 } else {
885 None
886 }
887 }
888 StorageResponse::DroppedId(id) => {
889 let (_, shard_frontiers) = match self.uppers.get_mut(&id) {
890 Some(value) => value,
891 None => panic!("Reference to absent collection: {id}"),
892 };
893 let prev = shard_frontiers[shard_id].take();
894 assert!(
895 prev.is_some(),
896 "got double drop for {id} from shard {shard_id}"
897 );
898
899 if shard_frontiers.iter().all(Option::is_none) {
900 self.uppers.remove(&id);
901 Some(Ok(StorageResponse::DroppedId(id)))
902 } else {
903 None
904 }
905 }
906 StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
907 Some(Ok(StorageResponse::StatisticsUpdates(
911 source_stats,
912 sink_stats,
913 )))
914 }
915 StorageResponse::StatusUpdate(updates) => {
916 Some(Ok(StorageResponse::StatusUpdate(updates)))
917 }
918 StorageResponse::StagedBatches(batches) => {
919 let mut finished_batches = BTreeMap::new();
920
921 for (collection_id, batches) in batches {
922 tracing::info!(%shard_id, %collection_id, "got batch");
923
924 let entry = self
925 .oneshot_source_responses
926 .entry(collection_id)
927 .or_default();
928 let novel = entry.insert(shard_id, batches);
929 assert_none!(novel, "Duplicate oneshot source response");
930
931 if entry.len() == self.parts {
933 let entry = self
934 .oneshot_source_responses
935 .remove(&collection_id)
936 .expect("checked above");
937 let all_batches: Vec<_> = entry.into_values().flatten().collect();
938
939 finished_batches.insert(collection_id, all_batches);
940 }
941 }
942
943 if !finished_batches.is_empty() {
944 Some(Ok(StorageResponse::StagedBatches(finished_batches)))
945 } else {
946 None
947 }
948 }
949 }
950 }
951}
952
953#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
954pub struct Update<T = mz_repr::Timestamp> {
956 pub row: Row,
957 pub timestamp: T,
958 pub diff: Diff,
959}
960
961#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
962pub struct TimestamplessUpdate {
967 pub row: Row,
968 pub diff: Diff,
969}
970
971#[derive(Debug, Clone, PartialEq)]
972pub enum TableData {
973 Rows(Vec<(Row, Diff)>),
977 Batches(SmallVec<[ProtoBatch; 1]>),
979}
980
981impl TableData {
982 pub fn is_empty(&self) -> bool {
983 match self {
984 TableData::Rows(rows) => rows.is_empty(),
985 TableData::Batches(batches) => batches.is_empty(),
986 }
987 }
988}
989
990pub struct TimestamplessUpdateBuilder<K, V, T, D>
993where
994 K: Codec,
995 V: Codec,
996 T: Timestamp + Lattice + Codec64,
997 D: Codec64,
998{
999 builder: BatchBuilder<K, V, T, D>,
1000 initial_ts: T,
1001}
1002
1003impl<K, V, T, D> TimestamplessUpdateBuilder<K, V, T, D>
1004where
1005 K: Debug + Codec,
1006 V: Debug + Codec,
1007 T: TimestampManipulation + Lattice + Codec64 + Sync,
1008 D: Semigroup + Ord + Codec64 + Send + Sync,
1009{
1010 pub fn new(handle: &WriteHandle<K, V, T, D>) -> Self {
1013 let initial_ts = T::minimum();
1014 let builder = handle.builder(Antichain::from_elem(initial_ts.clone()));
1015 TimestamplessUpdateBuilder {
1016 builder,
1017 initial_ts,
1018 }
1019 }
1020
1021 pub async fn add(&mut self, k: &K, v: &V, d: &D) {
1023 self.builder
1024 .add(k, v, &self.initial_ts, d)
1025 .await
1026 .expect("invalid Persist usage");
1027 }
1028
1029 pub async fn finish(self) -> ProtoBatch {
1034 let finish_ts = StepForward::step_forward(&self.initial_ts);
1035 let batch = self
1036 .builder
1037 .finish(Antichain::from_elem(finish_ts))
1038 .await
1039 .expect("invalid Persist usage");
1040
1041 batch.into_transmittable_batch()
1042 }
1043}
1044
1045impl RustType<ProtoCompaction> for (GlobalId, Antichain<mz_repr::Timestamp>) {
1046 fn into_proto(&self) -> ProtoCompaction {
1047 ProtoCompaction {
1048 id: Some(self.0.into_proto()),
1049 frontier: Some(self.1.into_proto()),
1050 }
1051 }
1052
1053 fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1054 Ok((
1055 proto.id.into_rust_if_some("ProtoCompaction::id")?,
1056 proto
1057 .frontier
1058 .into_rust_if_some("ProtoCompaction::frontier")?,
1059 ))
1060 }
1061}
1062
1063impl TryIntoProtocolNonce for StorageCommand {
1064 fn try_into_protocol_nonce(self) -> Result<Uuid, Self> {
1065 match self {
1066 StorageCommand::Hello { nonce } => Ok(nonce),
1067 cmd => Err(cmd),
1068 }
1069 }
1070}
1071
1072#[cfg(test)]
1073mod tests {
1074 use mz_ore::assert_ok;
1075 use mz_proto::protobuf_roundtrip;
1076 use proptest::prelude::ProptestConfig;
1077 use proptest::proptest;
1078
1079 use super::*;
1080
1081 #[mz_ore::test]
1083 fn test_storage_command_size() {
1084 assert_eq!(std::mem::size_of::<StorageCommand>(), 40);
1085 }
1086
1087 #[mz_ore::test]
1089 fn test_storage_response_size() {
1090 assert_eq!(std::mem::size_of::<StorageResponse>(), 120);
1091 }
1092
1093 proptest! {
1094 #![proptest_config(ProptestConfig::with_cases(32))]
1095
1096 #[mz_ore::test]
1097 #[cfg_attr(miri, ignore)] fn storage_command_protobuf_roundtrip(expect in any::<StorageCommand<mz_repr::Timestamp>>() ) {
1099 let actual = protobuf_roundtrip::<_, ProtoStorageCommand>(&expect);
1100 assert_ok!(actual);
1101 assert_eq!(actual.unwrap(), expect);
1102 }
1103
1104 #[mz_ore::test]
1105 #[cfg_attr(miri, ignore)] fn storage_response_protobuf_roundtrip(expect in any::<StorageResponse<mz_repr::Timestamp>>() ) {
1107 let actual = protobuf_roundtrip::<_, ProtoStorageResponse>(&expect);
1108 assert_ok!(actual);
1109 assert_eq!(actual.unwrap(), expect);
1110 }
1111 }
1112}