1#![allow(missing_docs)]
11#![allow(clippy::as_conversions, clippy::clone_on_ref_ptr)]
14
15use std::collections::{BTreeMap, BTreeSet};
18use std::fmt::Debug;
19use std::iter;
20
21use async_trait::async_trait;
22use differential_dataflow::difference::Semigroup;
23use differential_dataflow::lattice::Lattice;
24use mz_cluster_client::ReplicaId;
25use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig, TryIntoTimelyConfig};
26use mz_ore::assert_none;
27use mz_persist_client::batch::{BatchBuilder, ProtoBatch};
28use mz_persist_client::write::WriteHandle;
29use mz_persist_types::{Codec, Codec64, StepForward};
30use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
31use mz_repr::{Diff, GlobalId, Row, TimestampManipulation};
32use mz_service::client::{GenericClient, Partitionable, PartitionedState};
33use mz_service::grpc::{GrpcClient, GrpcServer, ProtoServiceTypes, ResponseStream};
34use mz_storage_types::controller::CollectionMetadata;
35use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
36use mz_storage_types::parameters::StorageParameters;
37use mz_storage_types::sinks::StorageSinkDesc;
38use mz_storage_types::sources::IngestionDescription;
39use mz_timely_util::progress::any_antichain;
40use proptest::prelude::{Arbitrary, any};
41use proptest::strategy::{BoxedStrategy, Strategy, Union};
42use serde::{Deserialize, Serialize};
43use smallvec::SmallVec;
44use timely::PartialOrder;
45use timely::progress::Timestamp;
46use timely::progress::frontier::{Antichain, MutableAntichain};
47use tonic::{Request, Status as TonicStatus, Streaming};
48use uuid::Uuid;
49
50use crate::client::proto_storage_server::ProtoStorage;
51use crate::metrics::ReplicaMetrics;
52use crate::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate};
53
54include!(concat!(env!("OUT_DIR"), "/mz_storage_client.client.rs"));
55
56pub trait StorageClient<T = mz_repr::Timestamp>:
58 GenericClient<StorageCommand<T>, StorageResponse<T>>
59{
60}
61
62impl<C, T> StorageClient<T> for C where C: GenericClient<StorageCommand<T>, StorageResponse<T>> {}
63
64#[async_trait]
65impl<T: Send> GenericClient<StorageCommand<T>, StorageResponse<T>> for Box<dyn StorageClient<T>> {
66 async fn send(&mut self, cmd: StorageCommand<T>) -> Result<(), anyhow::Error> {
67 (**self).send(cmd).await
68 }
69
70 async fn recv(&mut self) -> Result<Option<StorageResponse<T>>, anyhow::Error> {
76 (**self).recv().await
78 }
79}
80
81#[derive(Debug, Clone)]
82pub enum StorageProtoServiceTypes {}
83
84impl ProtoServiceTypes for StorageProtoServiceTypes {
85 type PC = ProtoStorageCommand;
86 type PR = ProtoStorageResponse;
87 type STATS = ReplicaMetrics;
88 const URL: &'static str = "/mz_storage_client.client.ProtoStorage/CommandResponseStream";
89}
90
91pub type StorageGrpcClient = GrpcClient<StorageProtoServiceTypes>;
92
93#[async_trait]
94impl<F, G> ProtoStorage for GrpcServer<F>
95where
96 F: Fn() -> G + Send + Sync + 'static,
97 G: StorageClient + 'static,
98{
99 type CommandResponseStreamStream = ResponseStream<ProtoStorageResponse>;
100
101 async fn command_response_stream(
102 &self,
103 request: Request<Streaming<ProtoStorageCommand>>,
104 ) -> Result<tonic::Response<Self::CommandResponseStreamStream>, TonicStatus> {
105 self.forward_bidi_stream(request).await
106 }
107}
108
109#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
111pub enum StorageCommand<T = mz_repr::Timestamp> {
112 CreateTimely {
115 config: TimelyConfig,
116 epoch: ClusterStartupEpoch,
117 },
118 InitializationComplete,
121 AllowWrites,
128 UpdateConfiguration(StorageParameters),
130 RunIngestions(Vec<RunIngestionCommand>),
132 AllowCompaction(Vec<(GlobalId, Antichain<T>)>),
137 RunSinks(Vec<RunSinkCommand<T>>),
138 RunOneshotIngestion(Vec<RunOneshotIngestion>),
144 CancelOneshotIngestion {
153 ingestions: Vec<Uuid>,
154 },
155}
156
157impl<T> StorageCommand<T> {
158 pub fn installs_objects(&self) -> bool {
160 use StorageCommand::*;
161 match self {
162 CreateTimely { .. }
163 | InitializationComplete
164 | AllowWrites
165 | UpdateConfiguration(_)
166 | AllowCompaction(_)
167 | CancelOneshotIngestion { .. } => false,
168 RunIngestions(_) | RunSinks(_) | RunOneshotIngestion(_) => true,
172 }
173 }
174}
175
176#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
178pub struct RunIngestionCommand {
179 pub id: GlobalId,
181 pub description: IngestionDescription<CollectionMetadata>,
184}
185
186impl Arbitrary for RunIngestionCommand {
187 type Strategy = BoxedStrategy<Self>;
188 type Parameters = ();
189
190 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
191 (
192 any::<GlobalId>(),
193 any::<IngestionDescription<CollectionMetadata>>(),
194 )
195 .prop_map(|(id, description)| Self { id, description })
196 .boxed()
197 }
198}
199
200impl RustType<ProtoRunIngestionCommand> for RunIngestionCommand {
201 fn into_proto(&self) -> ProtoRunIngestionCommand {
202 ProtoRunIngestionCommand {
203 id: Some(self.id.into_proto()),
204 description: Some(self.description.into_proto()),
205 }
206 }
207
208 fn from_proto(proto: ProtoRunIngestionCommand) -> Result<Self, TryFromProtoError> {
209 Ok(RunIngestionCommand {
210 id: proto.id.into_rust_if_some("ProtoRunIngestionCommand::id")?,
211 description: proto
212 .description
213 .into_rust_if_some("ProtoRunIngestionCommand::description")?,
214 })
215 }
216}
217
218#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
220pub struct RunOneshotIngestion {
221 pub ingestion_id: uuid::Uuid,
223 pub collection_id: GlobalId,
225 pub collection_meta: CollectionMetadata,
227 pub request: OneshotIngestionRequest,
229}
230
231impl RustType<ProtoRunOneshotIngestion> for RunOneshotIngestion {
232 fn into_proto(&self) -> ProtoRunOneshotIngestion {
233 ProtoRunOneshotIngestion {
234 ingestion_id: Some(self.ingestion_id.into_proto()),
235 collection_id: Some(self.collection_id.into_proto()),
236 storage_metadata: Some(self.collection_meta.into_proto()),
237 request: Some(self.request.into_proto()),
238 }
239 }
240
241 fn from_proto(proto: ProtoRunOneshotIngestion) -> Result<Self, TryFromProtoError> {
242 Ok(RunOneshotIngestion {
243 ingestion_id: proto
244 .ingestion_id
245 .into_rust_if_some("ProtoRunOneshotIngestion::ingestion_id")?,
246 collection_id: proto
247 .collection_id
248 .into_rust_if_some("ProtoRunOneshotIngestion::collection_id")?,
249 collection_meta: proto
250 .storage_metadata
251 .into_rust_if_some("ProtoRunOneshotIngestion::storage_metadata")?,
252 request: proto
253 .request
254 .into_rust_if_some("ProtoRunOneshotIngestion::request")?,
255 })
256 }
257}
258
259impl RustType<ProtoRunSinkCommand> for RunSinkCommand<mz_repr::Timestamp> {
260 fn into_proto(&self) -> ProtoRunSinkCommand {
261 ProtoRunSinkCommand {
262 id: Some(self.id.into_proto()),
263 description: Some(self.description.into_proto()),
264 }
265 }
266
267 fn from_proto(proto: ProtoRunSinkCommand) -> Result<Self, TryFromProtoError> {
268 Ok(RunSinkCommand {
269 id: proto.id.into_rust_if_some("ProtoRunSinkCommand::id")?,
270 description: proto
271 .description
272 .into_rust_if_some("ProtoRunSinkCommand::description")?,
273 })
274 }
275}
276
277#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
279pub struct RunSinkCommand<T> {
280 pub id: GlobalId,
281 pub description: StorageSinkDesc<CollectionMetadata, T>,
282}
283
284impl Arbitrary for RunSinkCommand<mz_repr::Timestamp> {
285 type Strategy = BoxedStrategy<Self>;
286 type Parameters = ();
287
288 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
289 (
290 any::<GlobalId>(),
291 any::<StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>>(),
292 )
293 .prop_map(|(id, description)| Self { id, description })
294 .boxed()
295 }
296}
297
298impl RustType<ProtoStorageCommand> for StorageCommand<mz_repr::Timestamp> {
299 fn into_proto(&self) -> ProtoStorageCommand {
300 use proto_storage_command::Kind::*;
301 use proto_storage_command::*;
302 ProtoStorageCommand {
303 kind: Some(match self {
304 StorageCommand::CreateTimely { config, epoch } => CreateTimely(ProtoCreateTimely {
305 config: Some(config.into_proto()),
306 epoch: Some(epoch.into_proto()),
307 }),
308 StorageCommand::InitializationComplete => InitializationComplete(()),
309 StorageCommand::AllowWrites => AllowWrites(()),
310 StorageCommand::UpdateConfiguration(params) => {
311 UpdateConfiguration(params.into_proto())
312 }
313 StorageCommand::AllowCompaction(collections) => {
314 AllowCompaction(ProtoAllowCompaction {
315 collections: collections.into_proto(),
316 })
317 }
318 StorageCommand::RunIngestions(sources) => CreateSources(ProtoCreateSources {
319 sources: sources.into_proto(),
320 }),
321 StorageCommand::RunSinks(sinks) => RunSinks(ProtoRunSinks {
322 sinks: sinks.into_proto(),
323 }),
324 StorageCommand::RunOneshotIngestion(ingestions) => {
325 RunOneshotIngestions(ProtoRunOneshotIngestionsCommand {
326 ingestions: ingestions.iter().map(|cmd| cmd.into_proto()).collect(),
327 })
328 }
329 StorageCommand::CancelOneshotIngestion { ingestions } => {
330 CancelOneshotIngestions(ProtoCancelOneshotIngestionsCommand {
331 ingestions: ingestions.iter().map(|uuid| uuid.into_proto()).collect(),
332 })
333 }
334 }),
335 }
336 }
337
338 fn from_proto(proto: ProtoStorageCommand) -> Result<Self, TryFromProtoError> {
339 use proto_storage_command::Kind::*;
340 use proto_storage_command::*;
341 match proto.kind {
342 Some(CreateTimely(ProtoCreateTimely { config, epoch })) => {
343 Ok(StorageCommand::CreateTimely {
344 config: config.into_rust_if_some("ProtoCreateTimely::config")?,
345 epoch: epoch.into_rust_if_some("ProtoCreateTimely::epoch")?,
346 })
347 }
348 Some(InitializationComplete(())) => Ok(StorageCommand::InitializationComplete),
349 Some(AllowWrites(())) => Ok(StorageCommand::AllowWrites),
350 Some(UpdateConfiguration(params)) => {
351 Ok(StorageCommand::UpdateConfiguration(params.into_rust()?))
352 }
353 Some(CreateSources(ProtoCreateSources { sources })) => {
354 Ok(StorageCommand::RunIngestions(sources.into_rust()?))
355 }
356 Some(AllowCompaction(ProtoAllowCompaction { collections })) => {
357 Ok(StorageCommand::AllowCompaction(collections.into_rust()?))
358 }
359 Some(RunSinks(ProtoRunSinks { sinks })) => {
360 Ok(StorageCommand::RunSinks(sinks.into_rust()?))
361 }
362 Some(RunOneshotIngestions(oneshot)) => {
363 let ingestions = oneshot
364 .ingestions
365 .into_iter()
366 .map(|cmd| cmd.into_rust())
367 .collect::<Result<_, _>>()?;
368 Ok(StorageCommand::RunOneshotIngestion(ingestions))
369 }
370 Some(CancelOneshotIngestions(oneshot)) => {
371 let ingestions = oneshot
372 .ingestions
373 .into_iter()
374 .map(|uuid| uuid.into_rust())
375 .collect::<Result<_, _>>()?;
376 Ok(StorageCommand::CancelOneshotIngestion { ingestions })
377 }
378 None => Err(TryFromProtoError::missing_field(
379 "ProtoStorageCommand::kind",
380 )),
381 }
382 }
383}
384
385impl Arbitrary for StorageCommand<mz_repr::Timestamp> {
386 type Strategy = Union<BoxedStrategy<Self>>;
387 type Parameters = ();
388
389 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
390 Union::new(vec![
391 proptest::collection::vec(any::<RunIngestionCommand>(), 1..4)
393 .prop_map(StorageCommand::RunIngestions)
394 .boxed(),
395 proptest::collection::vec(any::<RunSinkCommand<mz_repr::Timestamp>>(), 1..4)
396 .prop_map(StorageCommand::RunSinks)
397 .boxed(),
398 proptest::collection::vec(
399 (
400 any::<GlobalId>(),
401 proptest::collection::vec(any::<mz_repr::Timestamp>(), 1..4),
402 ),
403 1..4,
404 )
405 .prop_map(|collections| {
406 StorageCommand::AllowCompaction(
407 collections
408 .into_iter()
409 .map(|(id, frontier_vec)| (id, Antichain::from(frontier_vec)))
410 .collect(),
411 )
412 })
413 .boxed(),
414 ])
415 }
416}
417
418#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
420pub enum Status {
421 Starting,
422 Running,
423 Paused,
424 Stalled,
425 Ceased,
428 Dropped,
429}
430
431impl std::str::FromStr for Status {
432 type Err = anyhow::Error;
433 fn from_str(s: &str) -> Result<Self, Self::Err> {
435 Ok(match s {
436 "starting" => Status::Starting,
437 "running" => Status::Running,
438 "paused" => Status::Paused,
439 "stalled" => Status::Stalled,
440 "ceased" => Status::Ceased,
441 "dropped" => Status::Dropped,
442 s => return Err(anyhow::anyhow!("{} is not a valid status", s)),
443 })
444 }
445}
446
447impl Status {
448 pub fn to_str(&self) -> &'static str {
450 match self {
451 Status::Starting => "starting",
452 Status::Running => "running",
453 Status::Paused => "paused",
454 Status::Stalled => "stalled",
455 Status::Ceased => "ceased",
456 Status::Dropped => "dropped",
457 }
458 }
459
460 pub fn superseded_by(self, new: Status) -> bool {
463 match (self, new) {
464 (_, Status::Dropped) => true,
465 (Status::Dropped, _) => false,
466 (Status::Paused, Status::Paused) => false,
468 _ => true,
471 }
472 }
473}
474
475#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
481pub struct StatusUpdate {
482 pub id: GlobalId,
483 pub status: Status,
484 pub timestamp: chrono::DateTime<chrono::Utc>,
485 pub error: Option<String>,
486 pub hints: BTreeSet<String>,
487 pub namespaced_errors: BTreeMap<String, String>,
488 pub replica_id: Option<ReplicaId>,
489}
490
491impl StatusUpdate {
492 pub fn new(
493 id: GlobalId,
494 timestamp: chrono::DateTime<chrono::Utc>,
495 status: Status,
496 ) -> StatusUpdate {
497 StatusUpdate {
498 id,
499 timestamp,
500 status,
501 error: None,
502 hints: Default::default(),
503 namespaced_errors: Default::default(),
504 replica_id: None,
505 }
506 }
507}
508
509impl From<StatusUpdate> for Row {
510 fn from(update: StatusUpdate) -> Self {
511 use mz_repr::Datum;
512
513 let timestamp = Datum::TimestampTz(update.timestamp.try_into().expect("must fit"));
514 let id = update.id.to_string();
515 let id = Datum::String(&id);
516 let status = Datum::String(update.status.to_str());
517 let error = update.error.as_deref().into();
518
519 let mut row = Row::default();
520 let mut packer = row.packer();
521 packer.extend([timestamp, id, status, error]);
522
523 if !update.hints.is_empty() || !update.namespaced_errors.is_empty() {
524 packer.push_dict_with(|dict_packer| {
525 if !update.hints.is_empty() {
528 dict_packer.push(Datum::String("hints"));
529 dict_packer.push_list(update.hints.iter().map(|s| Datum::String(s)));
530 }
531 if !update.namespaced_errors.is_empty() {
532 dict_packer.push(Datum::String("namespaced"));
533 dict_packer.push_dict(
534 update
535 .namespaced_errors
536 .iter()
537 .map(|(k, v)| (k.as_str(), Datum::String(v))),
538 );
539 }
540 });
541 } else {
542 packer.push(Datum::Null);
543 }
544
545 match update.replica_id {
546 Some(id) => packer.push(Datum::String(&id.to_string())),
547 None => packer.push(Datum::Null),
548 }
549
550 row
551 }
552}
553
554impl RustType<proto_storage_response::ProtoStatus> for Status {
555 fn into_proto(&self) -> proto_storage_response::ProtoStatus {
556 use proto_storage_response::proto_status::*;
557
558 proto_storage_response::ProtoStatus {
559 kind: Some(match self {
560 Status::Starting => Kind::Starting(()),
561 Status::Running => Kind::Running(()),
562 Status::Paused => Kind::Paused(()),
563 Status::Stalled => Kind::Stalled(()),
564 Status::Ceased => Kind::Ceased(()),
565 Status::Dropped => Kind::Dropped(()),
566 }),
567 }
568 }
569
570 fn from_proto(proto: proto_storage_response::ProtoStatus) -> Result<Self, TryFromProtoError> {
571 use proto_storage_response::proto_status::*;
572 let kind = proto
573 .kind
574 .ok_or_else(|| TryFromProtoError::missing_field("ProtoStatus::kind"))?;
575
576 Ok(match kind {
577 Kind::Starting(()) => Status::Starting,
578 Kind::Running(()) => Status::Running,
579 Kind::Paused(()) => Status::Paused,
580 Kind::Stalled(()) => Status::Stalled,
581 Kind::Ceased(()) => Status::Ceased,
582 Kind::Dropped(()) => Status::Dropped,
583 })
584 }
585}
586
587impl RustType<proto_storage_response::ProtoStatusUpdate> for StatusUpdate {
588 fn into_proto(&self) -> proto_storage_response::ProtoStatusUpdate {
589 proto_storage_response::ProtoStatusUpdate {
590 id: Some(self.id.into_proto()),
591 status: Some(self.status.into_proto()),
592 timestamp: Some(self.timestamp.into_proto()),
593 error: self.error.clone(),
594 hints: self.hints.iter().cloned().collect(),
595 namespaced_errors: self.namespaced_errors.clone(),
596 replica_id: self.replica_id.map(|id| id.to_string().into_proto()),
597 }
598 }
599
600 fn from_proto(
601 proto: proto_storage_response::ProtoStatusUpdate,
602 ) -> Result<Self, TryFromProtoError> {
603 Ok(StatusUpdate {
604 id: proto.id.into_rust_if_some("ProtoStatusUpdate::id")?,
605 timestamp: proto
606 .timestamp
607 .into_rust_if_some("ProtoStatusUpdate::timestamp")?,
608 status: proto
609 .status
610 .into_rust_if_some("ProtoStatusUpdate::status")?,
611 error: proto.error,
612 hints: proto.hints.into_iter().collect(),
613 namespaced_errors: proto.namespaced_errors,
614 replica_id: proto
615 .replica_id
616 .map(|replica_id: String| replica_id.parse().expect("must be a valid replica id")),
617 })
618 }
619}
620
621pub enum AppendOnlyUpdate {
623 Row((Row, Diff)),
624 Status(StatusUpdate),
625}
626
627impl AppendOnlyUpdate {
628 pub fn into_row(self) -> (Row, Diff) {
629 match self {
630 AppendOnlyUpdate::Row((row, diff)) => (row, diff),
631 AppendOnlyUpdate::Status(status) => (Row::from(status), Diff::ONE),
632 }
633 }
634}
635
636impl From<(Row, Diff)> for AppendOnlyUpdate {
637 fn from((row, diff): (Row, Diff)) -> Self {
638 Self::Row((row, diff))
639 }
640}
641
642impl From<StatusUpdate> for AppendOnlyUpdate {
643 fn from(update: StatusUpdate) -> Self {
644 Self::Status(update)
645 }
646}
647
648#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
650pub enum StorageResponse<T = mz_repr::Timestamp> {
651 FrontierUppers(Vec<(GlobalId, Antichain<T>)>),
653 DroppedId(GlobalId),
655 StagedBatches(BTreeMap<uuid::Uuid, Vec<Result<ProtoBatch, String>>>),
657
658 StatisticsUpdates(Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>),
660 StatusUpdates(Vec<StatusUpdate>),
663}
664
665impl RustType<ProtoStorageResponse> for StorageResponse<mz_repr::Timestamp> {
666 fn into_proto(&self) -> ProtoStorageResponse {
667 use proto_storage_response::Kind::*;
668 use proto_storage_response::{
669 ProtoDroppedId, ProtoStagedBatches, ProtoStatisticsUpdates, ProtoStatusUpdates,
670 };
671 ProtoStorageResponse {
672 kind: Some(match self {
673 StorageResponse::FrontierUppers(traces) => FrontierUppers(traces.into_proto()),
674 StorageResponse::DroppedId(id) => DroppedId(ProtoDroppedId {
675 id: Some(id.into_proto()),
676 }),
677 StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
678 Stats(ProtoStatisticsUpdates {
679 source_updates: source_stats
680 .iter()
681 .map(|update| update.into_proto())
682 .collect(),
683 sink_updates: sink_stats
684 .iter()
685 .map(|update| update.into_proto())
686 .collect(),
687 })
688 }
689 StorageResponse::StatusUpdates(updates) => StatusUpdates(ProtoStatusUpdates {
690 updates: updates.into_proto(),
691 }),
692 StorageResponse::StagedBatches(staged) => {
693 let batches = staged
694 .into_iter()
695 .map(|(collection_id, batches)| {
696 let batches = batches
697 .into_iter()
698 .map(|result| {
699 use proto_storage_response::proto_staged_batches::batch_result::Value;
700 let value = match result {
701 Ok(batch) => Value::Batch(batch.clone()),
702 Err(err) => Value::Error(err.clone()),
703 };
704 proto_storage_response::proto_staged_batches::BatchResult { value: Some(value) }
705 })
706 .collect();
707 proto_storage_response::proto_staged_batches::Inner {
708 id: Some(collection_id.into_proto()),
709 batches,
710 }
711 })
712 .collect();
713 StagedBatches(ProtoStagedBatches { batches })
714 }
715 }),
716 }
717 }
718
719 fn from_proto(proto: ProtoStorageResponse) -> Result<Self, TryFromProtoError> {
720 use proto_storage_response::Kind::*;
721 use proto_storage_response::{ProtoDroppedId, ProtoStatusUpdates};
722 match proto.kind {
723 Some(DroppedId(ProtoDroppedId { id })) => Ok(StorageResponse::DroppedId(
724 id.into_rust_if_some("ProtoDroppedId::id")?,
725 )),
726 Some(FrontierUppers(traces)) => {
727 Ok(StorageResponse::FrontierUppers(traces.into_rust()?))
728 }
729 Some(Stats(stats)) => Ok(StorageResponse::StatisticsUpdates(
730 stats
731 .source_updates
732 .into_iter()
733 .map(|update| update.into_rust())
734 .collect::<Result<Vec<_>, TryFromProtoError>>()?,
735 stats
736 .sink_updates
737 .into_iter()
738 .map(|update| update.into_rust())
739 .collect::<Result<Vec<_>, TryFromProtoError>>()?,
740 )),
741 Some(StatusUpdates(ProtoStatusUpdates { updates })) => {
742 Ok(StorageResponse::StatusUpdates(updates.into_rust()?))
743 }
744 Some(StagedBatches(staged)) => {
745 let batches: BTreeMap<_, _> = staged
746 .batches
747 .into_iter()
748 .map(|inner| {
749 let id = inner
750 .id
751 .into_rust_if_some("ProtoStagedBatches::Inner::id")?;
752
753 let mut batches = Vec::with_capacity(inner.batches.len());
754 for maybe_batch in inner.batches {
755 use proto_storage_response::proto_staged_batches::batch_result::Value;
756
757 let value = maybe_batch.value.ok_or_else(|| {
758 TryFromProtoError::missing_field("BatchResult::value")
759 })?;
760 let batch = match value {
761 Value::Batch(batch) => Ok(batch),
762 Value::Error(err) => Err(err),
763 };
764 batches.push(batch);
765 }
766
767 Ok::<_, TryFromProtoError>((id, batches))
768 })
769 .collect::<Result<_, _>>()?;
770
771 Ok(StorageResponse::StagedBatches(batches))
772 }
773 None => Err(TryFromProtoError::missing_field(
774 "ProtoStorageResponse::kind",
775 )),
776 }
777 }
778}
779
780impl Arbitrary for StorageResponse<mz_repr::Timestamp> {
781 type Strategy = Union<BoxedStrategy<Self>>;
782 type Parameters = ();
783
784 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
785 Union::new(vec![
787 proptest::collection::vec((any::<GlobalId>(), any_antichain()), 1..4)
788 .prop_map(StorageResponse::FrontierUppers)
789 .boxed(),
790 ])
791 }
792}
793
794#[derive(Debug)]
799pub struct PartitionedStorageState<T> {
800 parts: usize,
802 uppers: BTreeMap<GlobalId, (MutableAntichain<T>, Vec<Option<Antichain<T>>>)>,
805 oneshot_source_responses:
807 BTreeMap<uuid::Uuid, BTreeMap<usize, Vec<Result<ProtoBatch, String>>>>,
808}
809
810impl<T> Partitionable<StorageCommand<T>, StorageResponse<T>>
811 for (StorageCommand<T>, StorageResponse<T>)
812where
813 T: timely::progress::Timestamp + Lattice,
814{
815 type PartitionedState = PartitionedStorageState<T>;
816
817 fn new(parts: usize) -> PartitionedStorageState<T> {
818 PartitionedStorageState {
819 parts,
820 uppers: BTreeMap::new(),
821 oneshot_source_responses: BTreeMap::new(),
822 }
823 }
824}
825
826impl<T> PartitionedStorageState<T>
827where
828 T: timely::progress::Timestamp,
829{
830 fn observe_command(&mut self, command: &StorageCommand<T>) {
831 let _ = match command {
838 StorageCommand::CreateTimely { .. } => {
839 }
843 StorageCommand::RunIngestions(ingestions) => ingestions
844 .iter()
845 .for_each(|i| self.insert_new_uppers(i.description.collection_ids())),
846 StorageCommand::RunSinks(exports) => {
847 exports.iter().for_each(|e| self.insert_new_uppers([e.id]))
848 }
849 StorageCommand::InitializationComplete
850 | StorageCommand::AllowWrites
851 | StorageCommand::UpdateConfiguration(_)
852 | StorageCommand::AllowCompaction(_)
853 | StorageCommand::RunOneshotIngestion(_)
854 | StorageCommand::CancelOneshotIngestion { .. } => {}
855 };
856 }
857
858 fn insert_new_uppers<I: IntoIterator<Item = GlobalId>>(&mut self, ids: I) {
864 for id in ids {
865 self.uppers.entry(id).or_insert_with(|| {
866 let mut frontier = MutableAntichain::new();
867 #[allow(clippy::as_conversions)]
870 frontier.update_iter(iter::once((T::minimum(), self.parts as i64)));
871 let part_frontiers = vec![Some(Antichain::from_elem(T::minimum())); self.parts];
872
873 (frontier, part_frontiers)
874 });
875 }
876 }
877}
878
879impl<T> PartitionedState<StorageCommand<T>, StorageResponse<T>> for PartitionedStorageState<T>
880where
881 T: timely::progress::Timestamp + Lattice,
882{
883 fn split_command(&mut self, command: StorageCommand<T>) -> Vec<Option<StorageCommand<T>>> {
884 self.observe_command(&command);
885
886 match command {
887 StorageCommand::CreateTimely { config, epoch } => {
888 let timely_cmds = config.split_command(self.parts);
889
890 let timely_cmds = timely_cmds
891 .into_iter()
892 .map(|config| Some(StorageCommand::CreateTimely { config, epoch }))
893 .collect();
894 timely_cmds
895 }
896 command => {
897 vec![Some(command); self.parts]
900 }
901 }
902 }
903
904 fn absorb_response(
905 &mut self,
906 shard_id: usize,
907 response: StorageResponse<T>,
908 ) -> Option<Result<StorageResponse<T>, anyhow::Error>> {
909 match response {
910 StorageResponse::FrontierUppers(list) => {
912 let mut new_uppers = Vec::new();
913
914 for (id, new_shard_upper) in list {
915 let (frontier, shard_frontiers) = match self.uppers.get_mut(&id) {
916 Some(value) => value,
917 None => panic!("Reference to absent collection: {id}"),
918 };
919 let old_upper = frontier.frontier().to_owned();
920 let shard_upper = match &mut shard_frontiers[shard_id] {
921 Some(shard_upper) => shard_upper,
922 None => panic!("Reference to absent shard {shard_id} for collection {id}"),
923 };
924 frontier.update_iter(shard_upper.iter().map(|t| (t.clone(), -1)));
925 frontier.update_iter(new_shard_upper.iter().map(|t| (t.clone(), 1)));
926 shard_upper.join_assign(&new_shard_upper);
927
928 let new_upper = frontier.frontier();
929 if PartialOrder::less_than(&old_upper.borrow(), &new_upper) {
930 new_uppers.push((id, new_upper.to_owned()));
931 }
932 }
933
934 if new_uppers.is_empty() {
935 None
936 } else {
937 Some(Ok(StorageResponse::FrontierUppers(new_uppers)))
938 }
939 }
940 StorageResponse::DroppedId(id) => {
941 let (_, shard_frontiers) = match self.uppers.get_mut(&id) {
942 Some(value) => value,
943 None => panic!("Reference to absent collection: {id}"),
944 };
945 let prev = shard_frontiers[shard_id].take();
946 assert!(
947 prev.is_some(),
948 "got double drop for {id} from shard {shard_id}"
949 );
950
951 if shard_frontiers.iter().all(Option::is_none) {
952 self.uppers.remove(&id);
953 Some(Ok(StorageResponse::DroppedId(id)))
954 } else {
955 None
956 }
957 }
958 StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
959 Some(Ok(StorageResponse::StatisticsUpdates(
963 source_stats,
964 sink_stats,
965 )))
966 }
967 StorageResponse::StatusUpdates(updates) => {
968 Some(Ok(StorageResponse::StatusUpdates(updates)))
969 }
970 StorageResponse::StagedBatches(batches) => {
971 let mut finished_batches = BTreeMap::new();
972
973 for (collection_id, batches) in batches {
974 tracing::info!(%shard_id, %collection_id, "got batch");
975
976 let entry = self
977 .oneshot_source_responses
978 .entry(collection_id)
979 .or_default();
980 let novel = entry.insert(shard_id, batches);
981 assert_none!(novel, "Duplicate oneshot source response");
982
983 if entry.len() == self.parts {
985 let entry = self
986 .oneshot_source_responses
987 .remove(&collection_id)
988 .expect("checked above");
989 let all_batches: Vec<_> = entry.into_values().flatten().collect();
990
991 finished_batches.insert(collection_id, all_batches);
992 }
993 }
994
995 if !finished_batches.is_empty() {
996 Some(Ok(StorageResponse::StagedBatches(finished_batches)))
997 } else {
998 None
999 }
1000 }
1001 }
1002 }
1003}
1004
1005#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
1006pub struct Update<T = mz_repr::Timestamp> {
1008 pub row: Row,
1009 pub timestamp: T,
1010 pub diff: Diff,
1011}
1012
1013#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
1014pub struct TimestamplessUpdate {
1019 pub row: Row,
1020 pub diff: Diff,
1021}
1022
1023#[derive(Debug, Clone, PartialEq)]
1024pub enum TableData {
1025 Rows(Vec<(Row, Diff)>),
1029 Batches(SmallVec<[ProtoBatch; 1]>),
1031}
1032
1033impl TableData {
1034 pub fn is_empty(&self) -> bool {
1035 match self {
1036 TableData::Rows(rows) => rows.is_empty(),
1037 TableData::Batches(batches) => batches.is_empty(),
1038 }
1039 }
1040}
1041
1042pub struct TimestamplessUpdateBuilder<K, V, T, D>
1045where
1046 K: Codec,
1047 V: Codec,
1048 T: Timestamp + Lattice + Codec64,
1049 D: Codec64,
1050{
1051 builder: BatchBuilder<K, V, T, D>,
1052 initial_ts: T,
1053}
1054
1055impl<K, V, T, D> TimestamplessUpdateBuilder<K, V, T, D>
1056where
1057 K: Debug + Codec,
1058 V: Debug + Codec,
1059 T: TimestampManipulation + Lattice + Codec64 + Sync,
1060 D: Semigroup + Ord + Codec64 + Send + Sync,
1061{
1062 pub fn new(handle: &WriteHandle<K, V, T, D>) -> Self {
1065 let initial_ts = T::minimum();
1066 let builder = handle.builder(Antichain::from_elem(initial_ts.clone()));
1067 TimestamplessUpdateBuilder {
1068 builder,
1069 initial_ts,
1070 }
1071 }
1072
1073 pub async fn add(&mut self, k: &K, v: &V, d: &D) {
1075 self.builder
1076 .add(k, v, &self.initial_ts, d)
1077 .await
1078 .expect("invalid Persist usage");
1079 }
1080
1081 pub async fn finish(self) -> ProtoBatch {
1086 let finish_ts = StepForward::step_forward(&self.initial_ts);
1087 let batch = self
1088 .builder
1089 .finish(Antichain::from_elem(finish_ts))
1090 .await
1091 .expect("invalid Persist usage");
1092
1093 batch.into_transmittable_batch()
1094 }
1095}
1096
1097impl RustType<ProtoTrace> for (GlobalId, Antichain<mz_repr::Timestamp>) {
1098 fn into_proto(&self) -> ProtoTrace {
1099 ProtoTrace {
1100 id: Some(self.0.into_proto()),
1101 upper: Some(self.1.into_proto()),
1102 }
1103 }
1104
1105 fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1106 Ok((
1107 proto.id.into_rust_if_some("ProtoTrace::id")?,
1108 proto.upper.into_rust_if_some("ProtoTrace::upper")?,
1109 ))
1110 }
1111}
1112
1113impl RustType<ProtoFrontierUppersKind> for Vec<(GlobalId, Antichain<mz_repr::Timestamp>)> {
1114 fn into_proto(&self) -> ProtoFrontierUppersKind {
1115 ProtoFrontierUppersKind {
1116 traces: self.into_proto(),
1117 }
1118 }
1119
1120 fn from_proto(proto: ProtoFrontierUppersKind) -> Result<Self, TryFromProtoError> {
1121 proto.traces.into_rust()
1122 }
1123}
1124
1125impl RustType<ProtoCompaction> for (GlobalId, Antichain<mz_repr::Timestamp>) {
1126 fn into_proto(&self) -> ProtoCompaction {
1127 ProtoCompaction {
1128 id: Some(self.0.into_proto()),
1129 frontier: Some(self.1.into_proto()),
1130 }
1131 }
1132
1133 fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1134 Ok((
1135 proto.id.into_rust_if_some("ProtoCompaction::id")?,
1136 proto
1137 .frontier
1138 .into_rust_if_some("ProtoCompaction::frontier")?,
1139 ))
1140 }
1141}
1142
1143impl TryIntoTimelyConfig for StorageCommand {
1144 fn try_into_timely_config(self) -> Result<(TimelyConfig, ClusterStartupEpoch), Self> {
1145 match self {
1146 StorageCommand::CreateTimely { config, epoch } => Ok((config, epoch)),
1147 cmd => Err(cmd),
1148 }
1149 }
1150}
1151
1152#[cfg(test)]
1153mod tests {
1154 use mz_ore::assert_ok;
1155 use mz_proto::protobuf_roundtrip;
1156 use proptest::prelude::ProptestConfig;
1157 use proptest::proptest;
1158
1159 use super::*;
1160
1161 proptest! {
1162 #![proptest_config(ProptestConfig::with_cases(32))]
1163
1164 #[mz_ore::test]
1165 #[cfg_attr(miri, ignore)] fn storage_command_protobuf_roundtrip(expect in any::<StorageCommand<mz_repr::Timestamp>>() ) {
1167 let actual = protobuf_roundtrip::<_, ProtoStorageCommand>(&expect);
1168 assert_ok!(actual);
1169 assert_eq!(actual.unwrap(), expect);
1170 }
1171
1172 #[mz_ore::test]
1173 #[cfg_attr(miri, ignore)] fn storage_response_protobuf_roundtrip(expect in any::<StorageResponse<mz_repr::Timestamp>>() ) {
1175 let actual = protobuf_roundtrip::<_, ProtoStorageResponse>(&expect);
1176 assert_ok!(actual);
1177 assert_eq!(actual.unwrap(), expect);
1178 }
1179 }
1180}