1#![allow(missing_docs)]
11#![allow(clippy::as_conversions, clippy::clone_on_ref_ptr)]
14
15use std::collections::{BTreeMap, BTreeSet};
18use std::fmt::Debug;
19use std::iter;
20
21use async_trait::async_trait;
22use differential_dataflow::difference::Semigroup;
23use differential_dataflow::lattice::Lattice;
24use mz_cluster_client::ReplicaId;
25use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig, TryIntoTimelyConfig};
26use mz_ore::assert_none;
27use mz_persist_client::batch::{BatchBuilder, ProtoBatch};
28use mz_persist_client::write::WriteHandle;
29use mz_persist_types::{Codec, Codec64, StepForward};
30use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
31use mz_repr::{Diff, GlobalId, Row, TimestampManipulation};
32use mz_service::client::{GenericClient, Partitionable, PartitionedState};
33use mz_service::grpc::{GrpcClient, GrpcServer, ProtoServiceTypes, ResponseStream};
34use mz_storage_types::controller::CollectionMetadata;
35use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
36use mz_storage_types::parameters::StorageParameters;
37use mz_storage_types::sinks::StorageSinkDesc;
38use mz_storage_types::sources::IngestionDescription;
39use mz_timely_util::progress::any_antichain;
40use proptest::prelude::{Arbitrary, any};
41use proptest::strategy::{BoxedStrategy, Strategy, Union};
42use serde::{Deserialize, Serialize};
43use smallvec::SmallVec;
44use timely::PartialOrder;
45use timely::progress::Timestamp;
46use timely::progress::frontier::{Antichain, MutableAntichain};
47use tonic::{Request, Status as TonicStatus, Streaming};
48use uuid::Uuid;
49
50use crate::client::proto_storage_server::ProtoStorage;
51use crate::metrics::ReplicaMetrics;
52use crate::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate};
53
54include!(concat!(env!("OUT_DIR"), "/mz_storage_client.client.rs"));
55
56pub trait StorageClient<T = mz_repr::Timestamp>:
58 GenericClient<StorageCommand<T>, StorageResponse<T>>
59{
60}
61
62impl<C, T> StorageClient<T> for C where C: GenericClient<StorageCommand<T>, StorageResponse<T>> {}
63
64#[async_trait]
65impl<T: Send> GenericClient<StorageCommand<T>, StorageResponse<T>> for Box<dyn StorageClient<T>> {
66 async fn send(&mut self, cmd: StorageCommand<T>) -> Result<(), anyhow::Error> {
67 (**self).send(cmd).await
68 }
69
70 async fn recv(&mut self) -> Result<Option<StorageResponse<T>>, anyhow::Error> {
76 (**self).recv().await
78 }
79}
80
81#[derive(Debug, Clone)]
82pub enum StorageProtoServiceTypes {}
83
84impl ProtoServiceTypes for StorageProtoServiceTypes {
85 type PC = ProtoStorageCommand;
86 type PR = ProtoStorageResponse;
87 type STATS = ReplicaMetrics;
88 const URL: &'static str = "/mz_storage_client.client.ProtoStorage/CommandResponseStream";
89}
90
91pub type StorageGrpcClient = GrpcClient<StorageProtoServiceTypes>;
92
93#[async_trait]
94impl<F, G> ProtoStorage for GrpcServer<F>
95where
96 F: Fn() -> G + Send + Sync + 'static,
97 G: StorageClient + 'static,
98{
99 type CommandResponseStreamStream = ResponseStream<ProtoStorageResponse>;
100
101 async fn command_response_stream(
102 &self,
103 request: Request<Streaming<ProtoStorageCommand>>,
104 ) -> Result<tonic::Response<Self::CommandResponseStreamStream>, TonicStatus> {
105 self.forward_bidi_stream(request).await
106 }
107}
108
109#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
111pub enum StorageCommand<T = mz_repr::Timestamp> {
112 CreateTimely {
115 config: TimelyConfig,
116 epoch: ClusterStartupEpoch,
117 },
118 InitializationComplete,
121 AllowWrites,
128 UpdateConfiguration(StorageParameters),
130 RunIngestion(RunIngestionCommand),
132 AllowCompaction(GlobalId, Antichain<T>),
136 RunSink(RunSinkCommand<T>),
137 RunOneshotIngestion(Vec<RunOneshotIngestion>),
143 CancelOneshotIngestion {
152 ingestions: Vec<Uuid>,
153 },
154}
155
156impl<T> StorageCommand<T> {
157 pub fn installs_objects(&self) -> bool {
159 use StorageCommand::*;
160 match self {
161 CreateTimely { .. }
162 | InitializationComplete
163 | AllowWrites
164 | UpdateConfiguration(_)
165 | AllowCompaction(_, _)
166 | CancelOneshotIngestion { .. } => false,
167 RunIngestion(_) | RunSink(_) | RunOneshotIngestion(_) => true,
171 }
172 }
173}
174
175#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
177pub struct RunIngestionCommand {
178 pub id: GlobalId,
180 pub description: IngestionDescription<CollectionMetadata>,
183}
184
185impl Arbitrary for RunIngestionCommand {
186 type Strategy = BoxedStrategy<Self>;
187 type Parameters = ();
188
189 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
190 (
191 any::<GlobalId>(),
192 any::<IngestionDescription<CollectionMetadata>>(),
193 )
194 .prop_map(|(id, description)| Self { id, description })
195 .boxed()
196 }
197}
198
199impl RustType<ProtoRunIngestionCommand> for RunIngestionCommand {
200 fn into_proto(&self) -> ProtoRunIngestionCommand {
201 ProtoRunIngestionCommand {
202 id: Some(self.id.into_proto()),
203 description: Some(self.description.into_proto()),
204 }
205 }
206
207 fn from_proto(proto: ProtoRunIngestionCommand) -> Result<Self, TryFromProtoError> {
208 Ok(RunIngestionCommand {
209 id: proto.id.into_rust_if_some("ProtoRunIngestionCommand::id")?,
210 description: proto
211 .description
212 .into_rust_if_some("ProtoRunIngestionCommand::description")?,
213 })
214 }
215}
216
217#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
219pub struct RunOneshotIngestion {
220 pub ingestion_id: uuid::Uuid,
222 pub collection_id: GlobalId,
224 pub collection_meta: CollectionMetadata,
226 pub request: OneshotIngestionRequest,
228}
229
230impl RustType<ProtoRunOneshotIngestion> for RunOneshotIngestion {
231 fn into_proto(&self) -> ProtoRunOneshotIngestion {
232 ProtoRunOneshotIngestion {
233 ingestion_id: Some(self.ingestion_id.into_proto()),
234 collection_id: Some(self.collection_id.into_proto()),
235 storage_metadata: Some(self.collection_meta.into_proto()),
236 request: Some(self.request.into_proto()),
237 }
238 }
239
240 fn from_proto(proto: ProtoRunOneshotIngestion) -> Result<Self, TryFromProtoError> {
241 Ok(RunOneshotIngestion {
242 ingestion_id: proto
243 .ingestion_id
244 .into_rust_if_some("ProtoRunOneshotIngestion::ingestion_id")?,
245 collection_id: proto
246 .collection_id
247 .into_rust_if_some("ProtoRunOneshotIngestion::collection_id")?,
248 collection_meta: proto
249 .storage_metadata
250 .into_rust_if_some("ProtoRunOneshotIngestion::storage_metadata")?,
251 request: proto
252 .request
253 .into_rust_if_some("ProtoRunOneshotIngestion::request")?,
254 })
255 }
256}
257
258impl RustType<ProtoRunSinkCommand> for RunSinkCommand<mz_repr::Timestamp> {
259 fn into_proto(&self) -> ProtoRunSinkCommand {
260 ProtoRunSinkCommand {
261 id: Some(self.id.into_proto()),
262 description: Some(self.description.into_proto()),
263 }
264 }
265
266 fn from_proto(proto: ProtoRunSinkCommand) -> Result<Self, TryFromProtoError> {
267 Ok(RunSinkCommand {
268 id: proto.id.into_rust_if_some("ProtoRunSinkCommand::id")?,
269 description: proto
270 .description
271 .into_rust_if_some("ProtoRunSinkCommand::description")?,
272 })
273 }
274}
275
276#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
278pub struct RunSinkCommand<T> {
279 pub id: GlobalId,
280 pub description: StorageSinkDesc<CollectionMetadata, T>,
281}
282
283impl Arbitrary for RunSinkCommand<mz_repr::Timestamp> {
284 type Strategy = BoxedStrategy<Self>;
285 type Parameters = ();
286
287 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
288 (
289 any::<GlobalId>(),
290 any::<StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>>(),
291 )
292 .prop_map(|(id, description)| Self { id, description })
293 .boxed()
294 }
295}
296
297impl RustType<ProtoStorageCommand> for StorageCommand<mz_repr::Timestamp> {
298 fn into_proto(&self) -> ProtoStorageCommand {
299 use proto_storage_command::Kind::*;
300 use proto_storage_command::*;
301 ProtoStorageCommand {
302 kind: Some(match self {
303 StorageCommand::CreateTimely { config, epoch } => CreateTimely(ProtoCreateTimely {
304 config: Some(config.into_proto()),
305 epoch: Some(epoch.into_proto()),
306 }),
307 StorageCommand::InitializationComplete => InitializationComplete(()),
308 StorageCommand::AllowWrites => AllowWrites(()),
309 StorageCommand::UpdateConfiguration(params) => {
310 UpdateConfiguration(params.into_proto())
311 }
312 StorageCommand::AllowCompaction(id, frontier) => AllowCompaction(ProtoCompaction {
313 id: Some(id.into_proto()),
314 frontier: Some(frontier.into_proto()),
315 }),
316 StorageCommand::RunIngestion(ingestion) => RunIngestion(ingestion.into_proto()),
317 StorageCommand::RunSink(sink) => RunSink(sink.into_proto()),
318 StorageCommand::RunOneshotIngestion(ingestions) => {
319 RunOneshotIngestions(ProtoRunOneshotIngestionsCommand {
320 ingestions: ingestions.iter().map(|cmd| cmd.into_proto()).collect(),
321 })
322 }
323 StorageCommand::CancelOneshotIngestion { ingestions } => {
324 CancelOneshotIngestions(ProtoCancelOneshotIngestionsCommand {
325 ingestions: ingestions.iter().map(|uuid| uuid.into_proto()).collect(),
326 })
327 }
328 }),
329 }
330 }
331
332 fn from_proto(proto: ProtoStorageCommand) -> Result<Self, TryFromProtoError> {
333 use proto_storage_command::Kind::*;
334 use proto_storage_command::*;
335 match proto.kind {
336 Some(CreateTimely(ProtoCreateTimely { config, epoch })) => {
337 Ok(StorageCommand::CreateTimely {
338 config: config.into_rust_if_some("ProtoCreateTimely::config")?,
339 epoch: epoch.into_rust_if_some("ProtoCreateTimely::epoch")?,
340 })
341 }
342 Some(InitializationComplete(())) => Ok(StorageCommand::InitializationComplete),
343 Some(AllowWrites(())) => Ok(StorageCommand::AllowWrites),
344 Some(UpdateConfiguration(params)) => {
345 Ok(StorageCommand::UpdateConfiguration(params.into_rust()?))
346 }
347 Some(RunIngestion(ingestion)) => {
348 Ok(StorageCommand::RunIngestion(ingestion.into_rust()?))
349 }
350 Some(AllowCompaction(ProtoCompaction { id, frontier })) => {
351 Ok(StorageCommand::AllowCompaction(
352 id.into_rust_if_some("ProtoCompaction::id")?,
353 frontier.into_rust_if_some("ProtoCompaction::frontier")?,
354 ))
355 }
356 Some(RunSink(sink)) => Ok(StorageCommand::RunSink(sink.into_rust()?)),
357 Some(RunOneshotIngestions(oneshot)) => {
358 let ingestions = oneshot
359 .ingestions
360 .into_iter()
361 .map(|cmd| cmd.into_rust())
362 .collect::<Result<_, _>>()?;
363 Ok(StorageCommand::RunOneshotIngestion(ingestions))
364 }
365 Some(CancelOneshotIngestions(oneshot)) => {
366 let ingestions = oneshot
367 .ingestions
368 .into_iter()
369 .map(|uuid| uuid.into_rust())
370 .collect::<Result<_, _>>()?;
371 Ok(StorageCommand::CancelOneshotIngestion { ingestions })
372 }
373 None => Err(TryFromProtoError::missing_field(
374 "ProtoStorageCommand::kind",
375 )),
376 }
377 }
378}
379
380impl Arbitrary for StorageCommand<mz_repr::Timestamp> {
381 type Strategy = Union<BoxedStrategy<Self>>;
382 type Parameters = ();
383
384 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
385 Union::new(vec![
386 any::<RunIngestionCommand>()
388 .prop_map(StorageCommand::RunIngestion)
389 .boxed(),
390 any::<RunSinkCommand<mz_repr::Timestamp>>()
391 .prop_map(StorageCommand::RunSink)
392 .boxed(),
393 (any::<GlobalId>(), any_antichain())
394 .prop_map(|(id, frontier)| StorageCommand::AllowCompaction(id, frontier))
395 .boxed(),
396 ])
397 }
398}
399
400#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
402pub enum Status {
403 Starting,
404 Running,
405 Paused,
406 Stalled,
407 Ceased,
410 Dropped,
411}
412
413impl std::str::FromStr for Status {
414 type Err = anyhow::Error;
415 fn from_str(s: &str) -> Result<Self, Self::Err> {
417 Ok(match s {
418 "starting" => Status::Starting,
419 "running" => Status::Running,
420 "paused" => Status::Paused,
421 "stalled" => Status::Stalled,
422 "ceased" => Status::Ceased,
423 "dropped" => Status::Dropped,
424 s => return Err(anyhow::anyhow!("{} is not a valid status", s)),
425 })
426 }
427}
428
429impl Status {
430 pub fn to_str(&self) -> &'static str {
432 match self {
433 Status::Starting => "starting",
434 Status::Running => "running",
435 Status::Paused => "paused",
436 Status::Stalled => "stalled",
437 Status::Ceased => "ceased",
438 Status::Dropped => "dropped",
439 }
440 }
441
442 pub fn superseded_by(self, new: Status) -> bool {
445 match (self, new) {
446 (_, Status::Dropped) => true,
447 (Status::Dropped, _) => false,
448 (Status::Paused, Status::Paused) => false,
450 _ => true,
453 }
454 }
455}
456
457#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
463pub struct StatusUpdate {
464 pub id: GlobalId,
465 pub status: Status,
466 pub timestamp: chrono::DateTime<chrono::Utc>,
467 pub error: Option<String>,
468 pub hints: BTreeSet<String>,
469 pub namespaced_errors: BTreeMap<String, String>,
470 pub replica_id: Option<ReplicaId>,
471}
472
473impl StatusUpdate {
474 pub fn new(
475 id: GlobalId,
476 timestamp: chrono::DateTime<chrono::Utc>,
477 status: Status,
478 ) -> StatusUpdate {
479 StatusUpdate {
480 id,
481 timestamp,
482 status,
483 error: None,
484 hints: Default::default(),
485 namespaced_errors: Default::default(),
486 replica_id: None,
487 }
488 }
489}
490
491impl From<StatusUpdate> for Row {
492 fn from(update: StatusUpdate) -> Self {
493 use mz_repr::Datum;
494
495 let timestamp = Datum::TimestampTz(update.timestamp.try_into().expect("must fit"));
496 let id = update.id.to_string();
497 let id = Datum::String(&id);
498 let status = Datum::String(update.status.to_str());
499 let error = update.error.as_deref().into();
500
501 let mut row = Row::default();
502 let mut packer = row.packer();
503 packer.extend([timestamp, id, status, error]);
504
505 if !update.hints.is_empty() || !update.namespaced_errors.is_empty() {
506 packer.push_dict_with(|dict_packer| {
507 if !update.hints.is_empty() {
510 dict_packer.push(Datum::String("hints"));
511 dict_packer.push_list(update.hints.iter().map(|s| Datum::String(s)));
512 }
513 if !update.namespaced_errors.is_empty() {
514 dict_packer.push(Datum::String("namespaced"));
515 dict_packer.push_dict(
516 update
517 .namespaced_errors
518 .iter()
519 .map(|(k, v)| (k.as_str(), Datum::String(v))),
520 );
521 }
522 });
523 } else {
524 packer.push(Datum::Null);
525 }
526
527 match update.replica_id {
528 Some(id) => packer.push(Datum::String(&id.to_string())),
529 None => packer.push(Datum::Null),
530 }
531
532 row
533 }
534}
535
536impl RustType<proto_storage_response::ProtoStatus> for Status {
537 fn into_proto(&self) -> proto_storage_response::ProtoStatus {
538 use proto_storage_response::proto_status::*;
539
540 proto_storage_response::ProtoStatus {
541 kind: Some(match self {
542 Status::Starting => Kind::Starting(()),
543 Status::Running => Kind::Running(()),
544 Status::Paused => Kind::Paused(()),
545 Status::Stalled => Kind::Stalled(()),
546 Status::Ceased => Kind::Ceased(()),
547 Status::Dropped => Kind::Dropped(()),
548 }),
549 }
550 }
551
552 fn from_proto(proto: proto_storage_response::ProtoStatus) -> Result<Self, TryFromProtoError> {
553 use proto_storage_response::proto_status::*;
554 let kind = proto
555 .kind
556 .ok_or_else(|| TryFromProtoError::missing_field("ProtoStatus::kind"))?;
557
558 Ok(match kind {
559 Kind::Starting(()) => Status::Starting,
560 Kind::Running(()) => Status::Running,
561 Kind::Paused(()) => Status::Paused,
562 Kind::Stalled(()) => Status::Stalled,
563 Kind::Ceased(()) => Status::Ceased,
564 Kind::Dropped(()) => Status::Dropped,
565 })
566 }
567}
568
569impl RustType<proto_storage_response::ProtoStatusUpdate> for StatusUpdate {
570 fn into_proto(&self) -> proto_storage_response::ProtoStatusUpdate {
571 proto_storage_response::ProtoStatusUpdate {
572 id: Some(self.id.into_proto()),
573 status: Some(self.status.into_proto()),
574 timestamp: Some(self.timestamp.into_proto()),
575 error: self.error.clone(),
576 hints: self.hints.iter().cloned().collect(),
577 namespaced_errors: self.namespaced_errors.clone(),
578 replica_id: self.replica_id.map(|id| id.to_string().into_proto()),
579 }
580 }
581
582 fn from_proto(
583 proto: proto_storage_response::ProtoStatusUpdate,
584 ) -> Result<Self, TryFromProtoError> {
585 Ok(StatusUpdate {
586 id: proto.id.into_rust_if_some("ProtoStatusUpdate::id")?,
587 timestamp: proto
588 .timestamp
589 .into_rust_if_some("ProtoStatusUpdate::timestamp")?,
590 status: proto
591 .status
592 .into_rust_if_some("ProtoStatusUpdate::status")?,
593 error: proto.error,
594 hints: proto.hints.into_iter().collect(),
595 namespaced_errors: proto.namespaced_errors,
596 replica_id: proto
597 .replica_id
598 .map(|replica_id: String| replica_id.parse().expect("must be a valid replica id")),
599 })
600 }
601}
602
603pub enum AppendOnlyUpdate {
605 Row((Row, Diff)),
606 Status(StatusUpdate),
607}
608
609impl AppendOnlyUpdate {
610 pub fn into_row(self) -> (Row, Diff) {
611 match self {
612 AppendOnlyUpdate::Row((row, diff)) => (row, diff),
613 AppendOnlyUpdate::Status(status) => (Row::from(status), Diff::ONE),
614 }
615 }
616}
617
618impl From<(Row, Diff)> for AppendOnlyUpdate {
619 fn from((row, diff): (Row, Diff)) -> Self {
620 Self::Row((row, diff))
621 }
622}
623
624impl From<StatusUpdate> for AppendOnlyUpdate {
625 fn from(update: StatusUpdate) -> Self {
626 Self::Status(update)
627 }
628}
629
630#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
632pub enum StorageResponse<T = mz_repr::Timestamp> {
633 FrontierUpper(GlobalId, Antichain<T>),
635 DroppedId(GlobalId),
637 StagedBatches(BTreeMap<uuid::Uuid, Vec<Result<ProtoBatch, String>>>),
639 StatisticsUpdates(Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>),
641 StatusUpdate(StatusUpdate),
644}
645
646impl RustType<ProtoStorageResponse> for StorageResponse<mz_repr::Timestamp> {
647 fn into_proto(&self) -> ProtoStorageResponse {
648 use proto_storage_response::Kind::*;
649 use proto_storage_response::{
650 ProtoDroppedId, ProtoFrontierUpper, ProtoStagedBatches, ProtoStatisticsUpdates,
651 };
652 ProtoStorageResponse {
653 kind: Some(match self {
654 StorageResponse::FrontierUpper(id, upper) => FrontierUpper(ProtoFrontierUpper {
655 id: Some(id.into_proto()),
656 upper: Some(upper.into_proto()),
657 }),
658 StorageResponse::DroppedId(id) => DroppedId(ProtoDroppedId {
659 id: Some(id.into_proto()),
660 }),
661 StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
662 Stats(ProtoStatisticsUpdates {
663 source_updates: source_stats
664 .iter()
665 .map(|update| update.into_proto())
666 .collect(),
667 sink_updates: sink_stats
668 .iter()
669 .map(|update| update.into_proto())
670 .collect(),
671 })
672 }
673 StorageResponse::StatusUpdate(update) => StatusUpdate(update.into_proto()),
674 StorageResponse::StagedBatches(staged) => {
675 let batches = staged
676 .into_iter()
677 .map(|(collection_id, batches)| {
678 let batches = batches
679 .into_iter()
680 .map(|result| {
681 use proto_storage_response::proto_staged_batches::batch_result::Value;
682 let value = match result {
683 Ok(batch) => Value::Batch(batch.clone()),
684 Err(err) => Value::Error(err.clone()),
685 };
686 proto_storage_response::proto_staged_batches::BatchResult { value: Some(value) }
687 })
688 .collect();
689 proto_storage_response::proto_staged_batches::Inner {
690 id: Some(collection_id.into_proto()),
691 batches,
692 }
693 })
694 .collect();
695 StagedBatches(ProtoStagedBatches { batches })
696 }
697 }),
698 }
699 }
700
701 fn from_proto(proto: ProtoStorageResponse) -> Result<Self, TryFromProtoError> {
702 use proto_storage_response::Kind::*;
703 use proto_storage_response::{ProtoDroppedId, ProtoFrontierUpper};
704 match proto.kind {
705 Some(DroppedId(ProtoDroppedId { id })) => Ok(StorageResponse::DroppedId(
706 id.into_rust_if_some("ProtoDroppedId::id")?,
707 )),
708 Some(FrontierUpper(ProtoFrontierUpper { id, upper })) => {
709 Ok(StorageResponse::FrontierUpper(
710 id.into_rust_if_some("ProtoFrontierUpper::id")?,
711 upper.into_rust_if_some("ProtoFrontierUpper::upper")?,
712 ))
713 }
714 Some(Stats(stats)) => Ok(StorageResponse::StatisticsUpdates(
715 stats
716 .source_updates
717 .into_iter()
718 .map(|update| update.into_rust())
719 .collect::<Result<Vec<_>, TryFromProtoError>>()?,
720 stats
721 .sink_updates
722 .into_iter()
723 .map(|update| update.into_rust())
724 .collect::<Result<Vec<_>, TryFromProtoError>>()?,
725 )),
726 Some(StatusUpdate(update)) => Ok(StorageResponse::StatusUpdate(update.into_rust()?)),
727 Some(StagedBatches(staged)) => {
728 let batches: BTreeMap<_, _> = staged
729 .batches
730 .into_iter()
731 .map(|inner| {
732 let id = inner
733 .id
734 .into_rust_if_some("ProtoStagedBatches::Inner::id")?;
735
736 let mut batches = Vec::with_capacity(inner.batches.len());
737 for maybe_batch in inner.batches {
738 use proto_storage_response::proto_staged_batches::batch_result::Value;
739
740 let value = maybe_batch.value.ok_or_else(|| {
741 TryFromProtoError::missing_field("BatchResult::value")
742 })?;
743 let batch = match value {
744 Value::Batch(batch) => Ok(batch),
745 Value::Error(err) => Err(err),
746 };
747 batches.push(batch);
748 }
749
750 Ok::<_, TryFromProtoError>((id, batches))
751 })
752 .collect::<Result<_, _>>()?;
753
754 Ok(StorageResponse::StagedBatches(batches))
755 }
756 None => Err(TryFromProtoError::missing_field(
757 "ProtoStorageResponse::kind",
758 )),
759 }
760 }
761}
762
763impl Arbitrary for StorageResponse<mz_repr::Timestamp> {
764 type Strategy = Union<BoxedStrategy<Self>>;
765 type Parameters = ();
766
767 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
768 Union::new(vec![
770 (any::<GlobalId>(), any_antichain())
771 .prop_map(|(id, upper)| StorageResponse::FrontierUpper(id, upper))
772 .boxed(),
773 ])
774 }
775}
776
777#[derive(Debug)]
782pub struct PartitionedStorageState<T> {
783 parts: usize,
785 uppers: BTreeMap<GlobalId, (MutableAntichain<T>, Vec<Option<Antichain<T>>>)>,
788 oneshot_source_responses:
790 BTreeMap<uuid::Uuid, BTreeMap<usize, Vec<Result<ProtoBatch, String>>>>,
791}
792
793impl<T> Partitionable<StorageCommand<T>, StorageResponse<T>>
794 for (StorageCommand<T>, StorageResponse<T>)
795where
796 T: timely::progress::Timestamp + Lattice,
797{
798 type PartitionedState = PartitionedStorageState<T>;
799
800 fn new(parts: usize) -> PartitionedStorageState<T> {
801 PartitionedStorageState {
802 parts,
803 uppers: BTreeMap::new(),
804 oneshot_source_responses: BTreeMap::new(),
805 }
806 }
807}
808
809impl<T> PartitionedStorageState<T>
810where
811 T: timely::progress::Timestamp,
812{
813 fn observe_command(&mut self, command: &StorageCommand<T>) {
814 let _ = match command {
821 StorageCommand::CreateTimely { .. } => {
822 }
826 StorageCommand::RunIngestion(ingestion) => {
827 self.insert_new_uppers(ingestion.description.collection_ids());
828 }
829 StorageCommand::RunSink(export) => {
830 self.insert_new_uppers([export.id]);
831 }
832 StorageCommand::InitializationComplete
833 | StorageCommand::AllowWrites
834 | StorageCommand::UpdateConfiguration(_)
835 | StorageCommand::AllowCompaction(_, _)
836 | StorageCommand::RunOneshotIngestion(_)
837 | StorageCommand::CancelOneshotIngestion { .. } => {}
838 };
839 }
840
841 fn insert_new_uppers<I: IntoIterator<Item = GlobalId>>(&mut self, ids: I) {
847 for id in ids {
848 self.uppers.entry(id).or_insert_with(|| {
849 let mut frontier = MutableAntichain::new();
850 #[allow(clippy::as_conversions)]
853 frontier.update_iter(iter::once((T::minimum(), self.parts as i64)));
854 let part_frontiers = vec![Some(Antichain::from_elem(T::minimum())); self.parts];
855
856 (frontier, part_frontiers)
857 });
858 }
859 }
860}
861
862impl<T> PartitionedState<StorageCommand<T>, StorageResponse<T>> for PartitionedStorageState<T>
863where
864 T: timely::progress::Timestamp + Lattice,
865{
866 fn split_command(&mut self, command: StorageCommand<T>) -> Vec<Option<StorageCommand<T>>> {
867 self.observe_command(&command);
868
869 match command {
870 StorageCommand::CreateTimely { config, epoch } => {
871 let timely_cmds = config.split_command(self.parts);
872
873 let timely_cmds = timely_cmds
874 .into_iter()
875 .map(|config| Some(StorageCommand::CreateTimely { config, epoch }))
876 .collect();
877 timely_cmds
878 }
879 command => {
880 vec![Some(command); self.parts]
883 }
884 }
885 }
886
887 fn absorb_response(
888 &mut self,
889 shard_id: usize,
890 response: StorageResponse<T>,
891 ) -> Option<Result<StorageResponse<T>, anyhow::Error>> {
892 match response {
893 StorageResponse::FrontierUpper(id, new_shard_upper) => {
895 let (frontier, shard_frontiers) = match self.uppers.get_mut(&id) {
896 Some(value) => value,
897 None => panic!("Reference to absent collection: {id}"),
898 };
899 let old_upper = frontier.frontier().to_owned();
900 let shard_upper = match &mut shard_frontiers[shard_id] {
901 Some(shard_upper) => shard_upper,
902 None => panic!("Reference to absent shard {shard_id} for collection {id}"),
903 };
904 frontier.update_iter(shard_upper.iter().map(|t| (t.clone(), -1)));
905 frontier.update_iter(new_shard_upper.iter().map(|t| (t.clone(), 1)));
906 shard_upper.join_assign(&new_shard_upper);
907
908 let new_upper = frontier.frontier();
909 if PartialOrder::less_than(&old_upper.borrow(), &new_upper) {
910 Some(Ok(StorageResponse::FrontierUpper(id, new_upper.to_owned())))
911 } else {
912 None
913 }
914 }
915 StorageResponse::DroppedId(id) => {
916 let (_, shard_frontiers) = match self.uppers.get_mut(&id) {
917 Some(value) => value,
918 None => panic!("Reference to absent collection: {id}"),
919 };
920 let prev = shard_frontiers[shard_id].take();
921 assert!(
922 prev.is_some(),
923 "got double drop for {id} from shard {shard_id}"
924 );
925
926 if shard_frontiers.iter().all(Option::is_none) {
927 self.uppers.remove(&id);
928 Some(Ok(StorageResponse::DroppedId(id)))
929 } else {
930 None
931 }
932 }
933 StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
934 Some(Ok(StorageResponse::StatisticsUpdates(
938 source_stats,
939 sink_stats,
940 )))
941 }
942 StorageResponse::StatusUpdate(updates) => {
943 Some(Ok(StorageResponse::StatusUpdate(updates)))
944 }
945 StorageResponse::StagedBatches(batches) => {
946 let mut finished_batches = BTreeMap::new();
947
948 for (collection_id, batches) in batches {
949 tracing::info!(%shard_id, %collection_id, "got batch");
950
951 let entry = self
952 .oneshot_source_responses
953 .entry(collection_id)
954 .or_default();
955 let novel = entry.insert(shard_id, batches);
956 assert_none!(novel, "Duplicate oneshot source response");
957
958 if entry.len() == self.parts {
960 let entry = self
961 .oneshot_source_responses
962 .remove(&collection_id)
963 .expect("checked above");
964 let all_batches: Vec<_> = entry.into_values().flatten().collect();
965
966 finished_batches.insert(collection_id, all_batches);
967 }
968 }
969
970 if !finished_batches.is_empty() {
971 Some(Ok(StorageResponse::StagedBatches(finished_batches)))
972 } else {
973 None
974 }
975 }
976 }
977 }
978}
979
980#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
981pub struct Update<T = mz_repr::Timestamp> {
983 pub row: Row,
984 pub timestamp: T,
985 pub diff: Diff,
986}
987
988#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
989pub struct TimestamplessUpdate {
994 pub row: Row,
995 pub diff: Diff,
996}
997
998#[derive(Debug, Clone, PartialEq)]
999pub enum TableData {
1000 Rows(Vec<(Row, Diff)>),
1004 Batches(SmallVec<[ProtoBatch; 1]>),
1006}
1007
1008impl TableData {
1009 pub fn is_empty(&self) -> bool {
1010 match self {
1011 TableData::Rows(rows) => rows.is_empty(),
1012 TableData::Batches(batches) => batches.is_empty(),
1013 }
1014 }
1015}
1016
1017pub struct TimestamplessUpdateBuilder<K, V, T, D>
1020where
1021 K: Codec,
1022 V: Codec,
1023 T: Timestamp + Lattice + Codec64,
1024 D: Codec64,
1025{
1026 builder: BatchBuilder<K, V, T, D>,
1027 initial_ts: T,
1028}
1029
1030impl<K, V, T, D> TimestamplessUpdateBuilder<K, V, T, D>
1031where
1032 K: Debug + Codec,
1033 V: Debug + Codec,
1034 T: TimestampManipulation + Lattice + Codec64 + Sync,
1035 D: Semigroup + Ord + Codec64 + Send + Sync,
1036{
1037 pub fn new(handle: &WriteHandle<K, V, T, D>) -> Self {
1040 let initial_ts = T::minimum();
1041 let builder = handle.builder(Antichain::from_elem(initial_ts.clone()));
1042 TimestamplessUpdateBuilder {
1043 builder,
1044 initial_ts,
1045 }
1046 }
1047
1048 pub async fn add(&mut self, k: &K, v: &V, d: &D) {
1050 self.builder
1051 .add(k, v, &self.initial_ts, d)
1052 .await
1053 .expect("invalid Persist usage");
1054 }
1055
1056 pub async fn finish(self) -> ProtoBatch {
1061 let finish_ts = StepForward::step_forward(&self.initial_ts);
1062 let batch = self
1063 .builder
1064 .finish(Antichain::from_elem(finish_ts))
1065 .await
1066 .expect("invalid Persist usage");
1067
1068 batch.into_transmittable_batch()
1069 }
1070}
1071
1072impl RustType<ProtoCompaction> for (GlobalId, Antichain<mz_repr::Timestamp>) {
1073 fn into_proto(&self) -> ProtoCompaction {
1074 ProtoCompaction {
1075 id: Some(self.0.into_proto()),
1076 frontier: Some(self.1.into_proto()),
1077 }
1078 }
1079
1080 fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1081 Ok((
1082 proto.id.into_rust_if_some("ProtoCompaction::id")?,
1083 proto
1084 .frontier
1085 .into_rust_if_some("ProtoCompaction::frontier")?,
1086 ))
1087 }
1088}
1089
1090impl TryIntoTimelyConfig for StorageCommand {
1091 fn try_into_timely_config(self) -> Result<(TimelyConfig, ClusterStartupEpoch), Self> {
1092 match self {
1093 StorageCommand::CreateTimely { config, epoch } => Ok((config, epoch)),
1094 cmd => Err(cmd),
1095 }
1096 }
1097}
1098
1099#[cfg(test)]
1100mod tests {
1101 use mz_ore::assert_ok;
1102 use mz_proto::protobuf_roundtrip;
1103 use proptest::prelude::ProptestConfig;
1104 use proptest::proptest;
1105
1106 use super::*;
1107
1108 proptest! {
1109 #![proptest_config(ProptestConfig::with_cases(32))]
1110
1111 #[mz_ore::test]
1112 #[cfg_attr(miri, ignore)] fn storage_command_protobuf_roundtrip(expect in any::<StorageCommand<mz_repr::Timestamp>>() ) {
1114 let actual = protobuf_roundtrip::<_, ProtoStorageCommand>(&expect);
1115 assert_ok!(actual);
1116 assert_eq!(actual.unwrap(), expect);
1117 }
1118
1119 #[mz_ore::test]
1120 #[cfg_attr(miri, ignore)] fn storage_response_protobuf_roundtrip(expect in any::<StorageResponse<mz_repr::Timestamp>>() ) {
1122 let actual = protobuf_roundtrip::<_, ProtoStorageResponse>(&expect);
1123 assert_ok!(actual);
1124 assert_eq!(actual.unwrap(), expect);
1125 }
1126 }
1127}