1#![allow(missing_docs)]
11#![allow(clippy::as_conversions, clippy::clone_on_ref_ptr)]
14
15use std::collections::{BTreeMap, BTreeSet};
18use std::fmt::Debug;
19use std::iter;
20
21use async_trait::async_trait;
22use differential_dataflow::difference::Semigroup;
23use differential_dataflow::lattice::Lattice;
24use mz_cluster_client::ReplicaId;
25use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig, TryIntoTimelyConfig};
26use mz_ore::assert_none;
27use mz_persist_client::batch::{BatchBuilder, ProtoBatch};
28use mz_persist_client::write::WriteHandle;
29use mz_persist_types::{Codec, Codec64, StepForward};
30use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
31use mz_repr::{Diff, GlobalId, Row, TimestampManipulation};
32use mz_service::client::{GenericClient, Partitionable, PartitionedState};
33use mz_service::grpc::{GrpcClient, GrpcServer, ProtoServiceTypes, ResponseStream};
34use mz_storage_types::controller::CollectionMetadata;
35use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
36use mz_storage_types::parameters::StorageParameters;
37use mz_storage_types::sinks::StorageSinkDesc;
38use mz_storage_types::sources::IngestionDescription;
39use mz_timely_util::progress::any_antichain;
40use proptest::prelude::{Arbitrary, any};
41use proptest::strategy::{BoxedStrategy, Strategy, Union};
42use serde::{Deserialize, Serialize};
43use smallvec::SmallVec;
44use timely::PartialOrder;
45use timely::progress::Timestamp;
46use timely::progress::frontier::{Antichain, MutableAntichain};
47use tonic::{Request, Status as TonicStatus, Streaming};
48use uuid::Uuid;
49
50use crate::client::proto_storage_server::ProtoStorage;
51use crate::metrics::ReplicaMetrics;
52use crate::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate};
53
54include!(concat!(env!("OUT_DIR"), "/mz_storage_client.client.rs"));
55
56pub trait StorageClient<T = mz_repr::Timestamp>:
58 GenericClient<StorageCommand<T>, StorageResponse<T>>
59{
60}
61
62impl<C, T> StorageClient<T> for C where C: GenericClient<StorageCommand<T>, StorageResponse<T>> {}
63
64#[async_trait]
65impl<T: Send> GenericClient<StorageCommand<T>, StorageResponse<T>> for Box<dyn StorageClient<T>> {
66 async fn send(&mut self, cmd: StorageCommand<T>) -> Result<(), anyhow::Error> {
67 (**self).send(cmd).await
68 }
69
70 async fn recv(&mut self) -> Result<Option<StorageResponse<T>>, anyhow::Error> {
76 (**self).recv().await
78 }
79}
80
81#[derive(Debug, Clone)]
82pub enum StorageProtoServiceTypes {}
83
84impl ProtoServiceTypes for StorageProtoServiceTypes {
85 type PC = ProtoStorageCommand;
86 type PR = ProtoStorageResponse;
87 type STATS = ReplicaMetrics;
88 const URL: &'static str = "/mz_storage_client.client.ProtoStorage/CommandResponseStream";
89}
90
91pub type StorageGrpcClient = GrpcClient<StorageProtoServiceTypes>;
92
93#[async_trait]
94impl<F, G> ProtoStorage for GrpcServer<F>
95where
96 F: Fn() -> G + Send + Sync + 'static,
97 G: StorageClient + 'static,
98{
99 type CommandResponseStreamStream = ResponseStream<ProtoStorageResponse>;
100
101 async fn command_response_stream(
102 &self,
103 request: Request<Streaming<ProtoStorageCommand>>,
104 ) -> Result<tonic::Response<Self::CommandResponseStreamStream>, TonicStatus> {
105 self.forward_bidi_stream(request).await
106 }
107}
108
109#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
111pub enum StorageCommand<T = mz_repr::Timestamp> {
112 CreateTimely {
115 config: Box<TimelyConfig>,
116 epoch: ClusterStartupEpoch,
117 },
118 InitializationComplete,
121 AllowWrites,
128 UpdateConfiguration(Box<StorageParameters>),
130 RunIngestion(Box<RunIngestionCommand>),
132 AllowCompaction(GlobalId, Antichain<T>),
136 RunSink(Box<RunSinkCommand<T>>),
137 RunOneshotIngestion(Box<RunOneshotIngestion>),
143 CancelOneshotIngestion(Uuid),
152}
153
154impl<T> StorageCommand<T> {
155 pub fn installs_objects(&self) -> bool {
157 use StorageCommand::*;
158 match self {
159 CreateTimely { .. }
160 | InitializationComplete
161 | AllowWrites
162 | UpdateConfiguration(_)
163 | AllowCompaction(_, _)
164 | CancelOneshotIngestion { .. } => false,
165 RunIngestion(_) | RunSink(_) | RunOneshotIngestion(_) => true,
169 }
170 }
171}
172
173#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
175pub struct RunIngestionCommand {
176 pub id: GlobalId,
178 pub description: IngestionDescription<CollectionMetadata>,
181}
182
183impl Arbitrary for RunIngestionCommand {
184 type Strategy = BoxedStrategy<Self>;
185 type Parameters = ();
186
187 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
188 (
189 any::<GlobalId>(),
190 any::<IngestionDescription<CollectionMetadata>>(),
191 )
192 .prop_map(|(id, description)| Self { id, description })
193 .boxed()
194 }
195}
196
197impl RustType<ProtoRunIngestionCommand> for RunIngestionCommand {
198 fn into_proto(&self) -> ProtoRunIngestionCommand {
199 ProtoRunIngestionCommand {
200 id: Some(self.id.into_proto()),
201 description: Some(self.description.into_proto()),
202 }
203 }
204
205 fn from_proto(proto: ProtoRunIngestionCommand) -> Result<Self, TryFromProtoError> {
206 Ok(RunIngestionCommand {
207 id: proto.id.into_rust_if_some("ProtoRunIngestionCommand::id")?,
208 description: proto
209 .description
210 .into_rust_if_some("ProtoRunIngestionCommand::description")?,
211 })
212 }
213}
214
215#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
217pub struct RunOneshotIngestion {
218 pub ingestion_id: uuid::Uuid,
220 pub collection_id: GlobalId,
222 pub collection_meta: CollectionMetadata,
224 pub request: OneshotIngestionRequest,
226}
227
228impl RustType<ProtoRunOneshotIngestion> for RunOneshotIngestion {
229 fn into_proto(&self) -> ProtoRunOneshotIngestion {
230 ProtoRunOneshotIngestion {
231 ingestion_id: Some(self.ingestion_id.into_proto()),
232 collection_id: Some(self.collection_id.into_proto()),
233 storage_metadata: Some(self.collection_meta.into_proto()),
234 request: Some(self.request.into_proto()),
235 }
236 }
237
238 fn from_proto(proto: ProtoRunOneshotIngestion) -> Result<Self, TryFromProtoError> {
239 Ok(RunOneshotIngestion {
240 ingestion_id: proto
241 .ingestion_id
242 .into_rust_if_some("ProtoRunOneshotIngestion::ingestion_id")?,
243 collection_id: proto
244 .collection_id
245 .into_rust_if_some("ProtoRunOneshotIngestion::collection_id")?,
246 collection_meta: proto
247 .storage_metadata
248 .into_rust_if_some("ProtoRunOneshotIngestion::storage_metadata")?,
249 request: proto
250 .request
251 .into_rust_if_some("ProtoRunOneshotIngestion::request")?,
252 })
253 }
254}
255
256impl RustType<ProtoRunSinkCommand> for RunSinkCommand<mz_repr::Timestamp> {
257 fn into_proto(&self) -> ProtoRunSinkCommand {
258 ProtoRunSinkCommand {
259 id: Some(self.id.into_proto()),
260 description: Some(self.description.into_proto()),
261 }
262 }
263
264 fn from_proto(proto: ProtoRunSinkCommand) -> Result<Self, TryFromProtoError> {
265 Ok(RunSinkCommand {
266 id: proto.id.into_rust_if_some("ProtoRunSinkCommand::id")?,
267 description: proto
268 .description
269 .into_rust_if_some("ProtoRunSinkCommand::description")?,
270 })
271 }
272}
273
274#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
276pub struct RunSinkCommand<T> {
277 pub id: GlobalId,
278 pub description: StorageSinkDesc<CollectionMetadata, T>,
279}
280
281impl Arbitrary for RunSinkCommand<mz_repr::Timestamp> {
282 type Strategy = BoxedStrategy<Self>;
283 type Parameters = ();
284
285 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
286 (
287 any::<GlobalId>(),
288 any::<StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>>(),
289 )
290 .prop_map(|(id, description)| Self { id, description })
291 .boxed()
292 }
293}
294
295impl RustType<ProtoStorageCommand> for StorageCommand<mz_repr::Timestamp> {
296 fn into_proto(&self) -> ProtoStorageCommand {
297 use proto_storage_command::Kind::*;
298 use proto_storage_command::*;
299 ProtoStorageCommand {
300 kind: Some(match self {
301 StorageCommand::CreateTimely { config, epoch } => CreateTimely(ProtoCreateTimely {
302 config: Some(*config.into_proto()),
303 epoch: Some(epoch.into_proto()),
304 }),
305 StorageCommand::InitializationComplete => InitializationComplete(()),
306 StorageCommand::AllowWrites => AllowWrites(()),
307 StorageCommand::UpdateConfiguration(params) => {
308 UpdateConfiguration(*params.into_proto())
309 }
310 StorageCommand::AllowCompaction(id, frontier) => AllowCompaction(ProtoCompaction {
311 id: Some(id.into_proto()),
312 frontier: Some(frontier.into_proto()),
313 }),
314 StorageCommand::RunIngestion(ingestion) => RunIngestion(*ingestion.into_proto()),
315 StorageCommand::RunSink(sink) => RunSink(*sink.into_proto()),
316 StorageCommand::RunOneshotIngestion(ingestion) => {
317 RunOneshotIngestion(*ingestion.into_proto())
318 }
319 StorageCommand::CancelOneshotIngestion(uuid) => {
320 CancelOneshotIngestion(uuid.into_proto())
321 }
322 }),
323 }
324 }
325
326 fn from_proto(proto: ProtoStorageCommand) -> Result<Self, TryFromProtoError> {
327 use proto_storage_command::Kind::*;
328 use proto_storage_command::*;
329 match proto.kind {
330 Some(CreateTimely(ProtoCreateTimely { config, epoch })) => {
331 let config = Box::new(config.into_rust_if_some("ProtoCreateTimely::config")?);
332 let epoch = epoch.into_rust_if_some("ProtoCreateTimely::epoch")?;
333 Ok(StorageCommand::CreateTimely { config, epoch })
334 }
335 Some(InitializationComplete(())) => Ok(StorageCommand::InitializationComplete),
336 Some(AllowWrites(())) => Ok(StorageCommand::AllowWrites),
337 Some(UpdateConfiguration(params)) => {
338 let params = Box::new(params.into_rust()?);
339 Ok(StorageCommand::UpdateConfiguration(params))
340 }
341 Some(RunIngestion(ingestion)) => {
342 let ingestion = Box::new(ingestion.into_rust()?);
343 Ok(StorageCommand::RunIngestion(ingestion))
344 }
345 Some(AllowCompaction(ProtoCompaction { id, frontier })) => {
346 Ok(StorageCommand::AllowCompaction(
347 id.into_rust_if_some("ProtoCompaction::id")?,
348 frontier.into_rust_if_some("ProtoCompaction::frontier")?,
349 ))
350 }
351 Some(RunSink(sink)) => {
352 let sink = Box::new(sink.into_rust()?);
353 Ok(StorageCommand::RunSink(sink))
354 }
355 Some(RunOneshotIngestion(ingestion)) => {
356 let ingestion = Box::new(ingestion.into_rust()?);
357 Ok(StorageCommand::RunOneshotIngestion(ingestion))
358 }
359 Some(CancelOneshotIngestion(uuid)) => {
360 Ok(StorageCommand::CancelOneshotIngestion(uuid.into_rust()?))
361 }
362 None => Err(TryFromProtoError::missing_field(
363 "ProtoStorageCommand::kind",
364 )),
365 }
366 }
367}
368
369impl Arbitrary for StorageCommand<mz_repr::Timestamp> {
370 type Strategy = Union<BoxedStrategy<Self>>;
371 type Parameters = ();
372
373 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
374 Union::new(vec![
375 any::<RunIngestionCommand>()
377 .prop_map(Box::new)
378 .prop_map(StorageCommand::RunIngestion)
379 .boxed(),
380 any::<RunSinkCommand<mz_repr::Timestamp>>()
381 .prop_map(Box::new)
382 .prop_map(StorageCommand::RunSink)
383 .boxed(),
384 (any::<GlobalId>(), any_antichain())
385 .prop_map(|(id, frontier)| StorageCommand::AllowCompaction(id, frontier))
386 .boxed(),
387 ])
388 }
389}
390
391#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
393pub enum Status {
394 Starting,
395 Running,
396 Paused,
397 Stalled,
398 Ceased,
401 Dropped,
402}
403
404impl std::str::FromStr for Status {
405 type Err = anyhow::Error;
406 fn from_str(s: &str) -> Result<Self, Self::Err> {
408 Ok(match s {
409 "starting" => Status::Starting,
410 "running" => Status::Running,
411 "paused" => Status::Paused,
412 "stalled" => Status::Stalled,
413 "ceased" => Status::Ceased,
414 "dropped" => Status::Dropped,
415 s => return Err(anyhow::anyhow!("{} is not a valid status", s)),
416 })
417 }
418}
419
420impl Status {
421 pub fn to_str(&self) -> &'static str {
423 match self {
424 Status::Starting => "starting",
425 Status::Running => "running",
426 Status::Paused => "paused",
427 Status::Stalled => "stalled",
428 Status::Ceased => "ceased",
429 Status::Dropped => "dropped",
430 }
431 }
432
433 pub fn superseded_by(self, new: Status) -> bool {
436 match (self, new) {
437 (_, Status::Dropped) => true,
438 (Status::Dropped, _) => false,
439 (Status::Paused, Status::Paused) => false,
441 _ => true,
444 }
445 }
446}
447
448#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
454pub struct StatusUpdate {
455 pub id: GlobalId,
456 pub status: Status,
457 pub timestamp: chrono::DateTime<chrono::Utc>,
458 pub error: Option<String>,
459 pub hints: BTreeSet<String>,
460 pub namespaced_errors: BTreeMap<String, String>,
461 pub replica_id: Option<ReplicaId>,
462}
463
464impl StatusUpdate {
465 pub fn new(
466 id: GlobalId,
467 timestamp: chrono::DateTime<chrono::Utc>,
468 status: Status,
469 ) -> StatusUpdate {
470 StatusUpdate {
471 id,
472 timestamp,
473 status,
474 error: None,
475 hints: Default::default(),
476 namespaced_errors: Default::default(),
477 replica_id: None,
478 }
479 }
480}
481
482impl From<StatusUpdate> for Row {
483 fn from(update: StatusUpdate) -> Self {
484 use mz_repr::Datum;
485
486 let timestamp = Datum::TimestampTz(update.timestamp.try_into().expect("must fit"));
487 let id = update.id.to_string();
488 let id = Datum::String(&id);
489 let status = Datum::String(update.status.to_str());
490 let error = update.error.as_deref().into();
491
492 let mut row = Row::default();
493 let mut packer = row.packer();
494 packer.extend([timestamp, id, status, error]);
495
496 if !update.hints.is_empty() || !update.namespaced_errors.is_empty() {
497 packer.push_dict_with(|dict_packer| {
498 if !update.hints.is_empty() {
501 dict_packer.push(Datum::String("hints"));
502 dict_packer.push_list(update.hints.iter().map(|s| Datum::String(s)));
503 }
504 if !update.namespaced_errors.is_empty() {
505 dict_packer.push(Datum::String("namespaced"));
506 dict_packer.push_dict(
507 update
508 .namespaced_errors
509 .iter()
510 .map(|(k, v)| (k.as_str(), Datum::String(v))),
511 );
512 }
513 });
514 } else {
515 packer.push(Datum::Null);
516 }
517
518 match update.replica_id {
519 Some(id) => packer.push(Datum::String(&id.to_string())),
520 None => packer.push(Datum::Null),
521 }
522
523 row
524 }
525}
526
527impl RustType<proto_storage_response::ProtoStatus> for Status {
528 fn into_proto(&self) -> proto_storage_response::ProtoStatus {
529 use proto_storage_response::proto_status::*;
530
531 proto_storage_response::ProtoStatus {
532 kind: Some(match self {
533 Status::Starting => Kind::Starting(()),
534 Status::Running => Kind::Running(()),
535 Status::Paused => Kind::Paused(()),
536 Status::Stalled => Kind::Stalled(()),
537 Status::Ceased => Kind::Ceased(()),
538 Status::Dropped => Kind::Dropped(()),
539 }),
540 }
541 }
542
543 fn from_proto(proto: proto_storage_response::ProtoStatus) -> Result<Self, TryFromProtoError> {
544 use proto_storage_response::proto_status::*;
545 let kind = proto
546 .kind
547 .ok_or_else(|| TryFromProtoError::missing_field("ProtoStatus::kind"))?;
548
549 Ok(match kind {
550 Kind::Starting(()) => Status::Starting,
551 Kind::Running(()) => Status::Running,
552 Kind::Paused(()) => Status::Paused,
553 Kind::Stalled(()) => Status::Stalled,
554 Kind::Ceased(()) => Status::Ceased,
555 Kind::Dropped(()) => Status::Dropped,
556 })
557 }
558}
559
560impl RustType<proto_storage_response::ProtoStatusUpdate> for StatusUpdate {
561 fn into_proto(&self) -> proto_storage_response::ProtoStatusUpdate {
562 proto_storage_response::ProtoStatusUpdate {
563 id: Some(self.id.into_proto()),
564 status: Some(self.status.into_proto()),
565 timestamp: Some(self.timestamp.into_proto()),
566 error: self.error.clone(),
567 hints: self.hints.iter().cloned().collect(),
568 namespaced_errors: self.namespaced_errors.clone(),
569 replica_id: self.replica_id.map(|id| id.to_string().into_proto()),
570 }
571 }
572
573 fn from_proto(
574 proto: proto_storage_response::ProtoStatusUpdate,
575 ) -> Result<Self, TryFromProtoError> {
576 Ok(StatusUpdate {
577 id: proto.id.into_rust_if_some("ProtoStatusUpdate::id")?,
578 timestamp: proto
579 .timestamp
580 .into_rust_if_some("ProtoStatusUpdate::timestamp")?,
581 status: proto
582 .status
583 .into_rust_if_some("ProtoStatusUpdate::status")?,
584 error: proto.error,
585 hints: proto.hints.into_iter().collect(),
586 namespaced_errors: proto.namespaced_errors,
587 replica_id: proto
588 .replica_id
589 .map(|replica_id: String| replica_id.parse().expect("must be a valid replica id")),
590 })
591 }
592}
593
594pub enum AppendOnlyUpdate {
596 Row((Row, Diff)),
597 Status(StatusUpdate),
598}
599
600impl AppendOnlyUpdate {
601 pub fn into_row(self) -> (Row, Diff) {
602 match self {
603 AppendOnlyUpdate::Row((row, diff)) => (row, diff),
604 AppendOnlyUpdate::Status(status) => (Row::from(status), Diff::ONE),
605 }
606 }
607}
608
609impl From<(Row, Diff)> for AppendOnlyUpdate {
610 fn from((row, diff): (Row, Diff)) -> Self {
611 Self::Row((row, diff))
612 }
613}
614
615impl From<StatusUpdate> for AppendOnlyUpdate {
616 fn from(update: StatusUpdate) -> Self {
617 Self::Status(update)
618 }
619}
620
621#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
623pub enum StorageResponse<T = mz_repr::Timestamp> {
624 FrontierUpper(GlobalId, Antichain<T>),
626 DroppedId(GlobalId),
628 StagedBatches(BTreeMap<uuid::Uuid, Vec<Result<ProtoBatch, String>>>),
630 StatisticsUpdates(Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>),
632 StatusUpdate(StatusUpdate),
635}
636
637impl RustType<ProtoStorageResponse> for StorageResponse<mz_repr::Timestamp> {
638 fn into_proto(&self) -> ProtoStorageResponse {
639 use proto_storage_response::Kind::*;
640 use proto_storage_response::{
641 ProtoDroppedId, ProtoFrontierUpper, ProtoStagedBatches, ProtoStatisticsUpdates,
642 };
643 ProtoStorageResponse {
644 kind: Some(match self {
645 StorageResponse::FrontierUpper(id, upper) => FrontierUpper(ProtoFrontierUpper {
646 id: Some(id.into_proto()),
647 upper: Some(upper.into_proto()),
648 }),
649 StorageResponse::DroppedId(id) => DroppedId(ProtoDroppedId {
650 id: Some(id.into_proto()),
651 }),
652 StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
653 Stats(ProtoStatisticsUpdates {
654 source_updates: source_stats
655 .iter()
656 .map(|update| update.into_proto())
657 .collect(),
658 sink_updates: sink_stats
659 .iter()
660 .map(|update| update.into_proto())
661 .collect(),
662 })
663 }
664 StorageResponse::StatusUpdate(update) => StatusUpdate(update.into_proto()),
665 StorageResponse::StagedBatches(staged) => {
666 let batches = staged
667 .into_iter()
668 .map(|(collection_id, batches)| {
669 let batches = batches
670 .into_iter()
671 .map(|result| {
672 use proto_storage_response::proto_staged_batches::batch_result::Value;
673 let value = match result {
674 Ok(batch) => Value::Batch(batch.clone()),
675 Err(err) => Value::Error(err.clone()),
676 };
677 proto_storage_response::proto_staged_batches::BatchResult { value: Some(value) }
678 })
679 .collect();
680 proto_storage_response::proto_staged_batches::Inner {
681 id: Some(collection_id.into_proto()),
682 batches,
683 }
684 })
685 .collect();
686 StagedBatches(ProtoStagedBatches { batches })
687 }
688 }),
689 }
690 }
691
692 fn from_proto(proto: ProtoStorageResponse) -> Result<Self, TryFromProtoError> {
693 use proto_storage_response::Kind::*;
694 use proto_storage_response::{ProtoDroppedId, ProtoFrontierUpper};
695 match proto.kind {
696 Some(DroppedId(ProtoDroppedId { id })) => Ok(StorageResponse::DroppedId(
697 id.into_rust_if_some("ProtoDroppedId::id")?,
698 )),
699 Some(FrontierUpper(ProtoFrontierUpper { id, upper })) => {
700 Ok(StorageResponse::FrontierUpper(
701 id.into_rust_if_some("ProtoFrontierUpper::id")?,
702 upper.into_rust_if_some("ProtoFrontierUpper::upper")?,
703 ))
704 }
705 Some(Stats(stats)) => Ok(StorageResponse::StatisticsUpdates(
706 stats
707 .source_updates
708 .into_iter()
709 .map(|update| update.into_rust())
710 .collect::<Result<Vec<_>, TryFromProtoError>>()?,
711 stats
712 .sink_updates
713 .into_iter()
714 .map(|update| update.into_rust())
715 .collect::<Result<Vec<_>, TryFromProtoError>>()?,
716 )),
717 Some(StatusUpdate(update)) => Ok(StorageResponse::StatusUpdate(update.into_rust()?)),
718 Some(StagedBatches(staged)) => {
719 let batches: BTreeMap<_, _> = staged
720 .batches
721 .into_iter()
722 .map(|inner| {
723 let id = inner
724 .id
725 .into_rust_if_some("ProtoStagedBatches::Inner::id")?;
726
727 let mut batches = Vec::with_capacity(inner.batches.len());
728 for maybe_batch in inner.batches {
729 use proto_storage_response::proto_staged_batches::batch_result::Value;
730
731 let value = maybe_batch.value.ok_or_else(|| {
732 TryFromProtoError::missing_field("BatchResult::value")
733 })?;
734 let batch = match value {
735 Value::Batch(batch) => Ok(batch),
736 Value::Error(err) => Err(err),
737 };
738 batches.push(batch);
739 }
740
741 Ok::<_, TryFromProtoError>((id, batches))
742 })
743 .collect::<Result<_, _>>()?;
744
745 Ok(StorageResponse::StagedBatches(batches))
746 }
747 None => Err(TryFromProtoError::missing_field(
748 "ProtoStorageResponse::kind",
749 )),
750 }
751 }
752}
753
754impl Arbitrary for StorageResponse<mz_repr::Timestamp> {
755 type Strategy = Union<BoxedStrategy<Self>>;
756 type Parameters = ();
757
758 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
759 Union::new(vec![
761 (any::<GlobalId>(), any_antichain())
762 .prop_map(|(id, upper)| StorageResponse::FrontierUpper(id, upper))
763 .boxed(),
764 ])
765 }
766}
767
768#[derive(Debug)]
773pub struct PartitionedStorageState<T> {
774 parts: usize,
776 uppers: BTreeMap<GlobalId, (MutableAntichain<T>, Vec<Option<Antichain<T>>>)>,
779 oneshot_source_responses:
781 BTreeMap<uuid::Uuid, BTreeMap<usize, Vec<Result<ProtoBatch, String>>>>,
782}
783
784impl<T> Partitionable<StorageCommand<T>, StorageResponse<T>>
785 for (StorageCommand<T>, StorageResponse<T>)
786where
787 T: timely::progress::Timestamp + Lattice,
788{
789 type PartitionedState = PartitionedStorageState<T>;
790
791 fn new(parts: usize) -> PartitionedStorageState<T> {
792 PartitionedStorageState {
793 parts,
794 uppers: BTreeMap::new(),
795 oneshot_source_responses: BTreeMap::new(),
796 }
797 }
798}
799
800impl<T> PartitionedStorageState<T>
801where
802 T: timely::progress::Timestamp,
803{
804 fn observe_command(&mut self, command: &StorageCommand<T>) {
805 let _ = match command {
812 StorageCommand::CreateTimely { .. } => {
813 }
817 StorageCommand::RunIngestion(ingestion) => {
818 self.insert_new_uppers(ingestion.description.collection_ids());
819 }
820 StorageCommand::RunSink(export) => {
821 self.insert_new_uppers([export.id]);
822 }
823 StorageCommand::InitializationComplete
824 | StorageCommand::AllowWrites
825 | StorageCommand::UpdateConfiguration(_)
826 | StorageCommand::AllowCompaction(_, _)
827 | StorageCommand::RunOneshotIngestion(_)
828 | StorageCommand::CancelOneshotIngestion { .. } => {}
829 };
830 }
831
832 fn insert_new_uppers<I: IntoIterator<Item = GlobalId>>(&mut self, ids: I) {
838 for id in ids {
839 self.uppers.entry(id).or_insert_with(|| {
840 let mut frontier = MutableAntichain::new();
841 #[allow(clippy::as_conversions)]
844 frontier.update_iter(iter::once((T::minimum(), self.parts as i64)));
845 let part_frontiers = vec![Some(Antichain::from_elem(T::minimum())); self.parts];
846
847 (frontier, part_frontiers)
848 });
849 }
850 }
851}
852
853impl<T> PartitionedState<StorageCommand<T>, StorageResponse<T>> for PartitionedStorageState<T>
854where
855 T: timely::progress::Timestamp + Lattice,
856{
857 fn split_command(&mut self, command: StorageCommand<T>) -> Vec<Option<StorageCommand<T>>> {
858 self.observe_command(&command);
859
860 match command {
861 StorageCommand::CreateTimely { config, epoch } => {
862 let timely_cmds = config.split_command(self.parts);
863
864 let timely_cmds = timely_cmds
865 .into_iter()
866 .map(|config| {
867 Some(StorageCommand::CreateTimely {
868 config: Box::new(config),
869 epoch,
870 })
871 })
872 .collect();
873 timely_cmds
874 }
875 command => {
876 vec![Some(command); self.parts]
879 }
880 }
881 }
882
883 fn absorb_response(
884 &mut self,
885 shard_id: usize,
886 response: StorageResponse<T>,
887 ) -> Option<Result<StorageResponse<T>, anyhow::Error>> {
888 match response {
889 StorageResponse::FrontierUpper(id, new_shard_upper) => {
891 let (frontier, shard_frontiers) = match self.uppers.get_mut(&id) {
892 Some(value) => value,
893 None => panic!("Reference to absent collection: {id}"),
894 };
895 let old_upper = frontier.frontier().to_owned();
896 let shard_upper = match &mut shard_frontiers[shard_id] {
897 Some(shard_upper) => shard_upper,
898 None => panic!("Reference to absent shard {shard_id} for collection {id}"),
899 };
900 frontier.update_iter(shard_upper.iter().map(|t| (t.clone(), -1)));
901 frontier.update_iter(new_shard_upper.iter().map(|t| (t.clone(), 1)));
902 shard_upper.join_assign(&new_shard_upper);
903
904 let new_upper = frontier.frontier();
905 if PartialOrder::less_than(&old_upper.borrow(), &new_upper) {
906 Some(Ok(StorageResponse::FrontierUpper(id, new_upper.to_owned())))
907 } else {
908 None
909 }
910 }
911 StorageResponse::DroppedId(id) => {
912 let (_, shard_frontiers) = match self.uppers.get_mut(&id) {
913 Some(value) => value,
914 None => panic!("Reference to absent collection: {id}"),
915 };
916 let prev = shard_frontiers[shard_id].take();
917 assert!(
918 prev.is_some(),
919 "got double drop for {id} from shard {shard_id}"
920 );
921
922 if shard_frontiers.iter().all(Option::is_none) {
923 self.uppers.remove(&id);
924 Some(Ok(StorageResponse::DroppedId(id)))
925 } else {
926 None
927 }
928 }
929 StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
930 Some(Ok(StorageResponse::StatisticsUpdates(
934 source_stats,
935 sink_stats,
936 )))
937 }
938 StorageResponse::StatusUpdate(updates) => {
939 Some(Ok(StorageResponse::StatusUpdate(updates)))
940 }
941 StorageResponse::StagedBatches(batches) => {
942 let mut finished_batches = BTreeMap::new();
943
944 for (collection_id, batches) in batches {
945 tracing::info!(%shard_id, %collection_id, "got batch");
946
947 let entry = self
948 .oneshot_source_responses
949 .entry(collection_id)
950 .or_default();
951 let novel = entry.insert(shard_id, batches);
952 assert_none!(novel, "Duplicate oneshot source response");
953
954 if entry.len() == self.parts {
956 let entry = self
957 .oneshot_source_responses
958 .remove(&collection_id)
959 .expect("checked above");
960 let all_batches: Vec<_> = entry.into_values().flatten().collect();
961
962 finished_batches.insert(collection_id, all_batches);
963 }
964 }
965
966 if !finished_batches.is_empty() {
967 Some(Ok(StorageResponse::StagedBatches(finished_batches)))
968 } else {
969 None
970 }
971 }
972 }
973 }
974}
975
976#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
977pub struct Update<T = mz_repr::Timestamp> {
979 pub row: Row,
980 pub timestamp: T,
981 pub diff: Diff,
982}
983
984#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
985pub struct TimestamplessUpdate {
990 pub row: Row,
991 pub diff: Diff,
992}
993
994#[derive(Debug, Clone, PartialEq)]
995pub enum TableData {
996 Rows(Vec<(Row, Diff)>),
1000 Batches(SmallVec<[ProtoBatch; 1]>),
1002}
1003
1004impl TableData {
1005 pub fn is_empty(&self) -> bool {
1006 match self {
1007 TableData::Rows(rows) => rows.is_empty(),
1008 TableData::Batches(batches) => batches.is_empty(),
1009 }
1010 }
1011}
1012
1013pub struct TimestamplessUpdateBuilder<K, V, T, D>
1016where
1017 K: Codec,
1018 V: Codec,
1019 T: Timestamp + Lattice + Codec64,
1020 D: Codec64,
1021{
1022 builder: BatchBuilder<K, V, T, D>,
1023 initial_ts: T,
1024}
1025
1026impl<K, V, T, D> TimestamplessUpdateBuilder<K, V, T, D>
1027where
1028 K: Debug + Codec,
1029 V: Debug + Codec,
1030 T: TimestampManipulation + Lattice + Codec64 + Sync,
1031 D: Semigroup + Ord + Codec64 + Send + Sync,
1032{
1033 pub fn new(handle: &WriteHandle<K, V, T, D>) -> Self {
1036 let initial_ts = T::minimum();
1037 let builder = handle.builder(Antichain::from_elem(initial_ts.clone()));
1038 TimestamplessUpdateBuilder {
1039 builder,
1040 initial_ts,
1041 }
1042 }
1043
1044 pub async fn add(&mut self, k: &K, v: &V, d: &D) {
1046 self.builder
1047 .add(k, v, &self.initial_ts, d)
1048 .await
1049 .expect("invalid Persist usage");
1050 }
1051
1052 pub async fn finish(self) -> ProtoBatch {
1057 let finish_ts = StepForward::step_forward(&self.initial_ts);
1058 let batch = self
1059 .builder
1060 .finish(Antichain::from_elem(finish_ts))
1061 .await
1062 .expect("invalid Persist usage");
1063
1064 batch.into_transmittable_batch()
1065 }
1066}
1067
1068impl RustType<ProtoCompaction> for (GlobalId, Antichain<mz_repr::Timestamp>) {
1069 fn into_proto(&self) -> ProtoCompaction {
1070 ProtoCompaction {
1071 id: Some(self.0.into_proto()),
1072 frontier: Some(self.1.into_proto()),
1073 }
1074 }
1075
1076 fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1077 Ok((
1078 proto.id.into_rust_if_some("ProtoCompaction::id")?,
1079 proto
1080 .frontier
1081 .into_rust_if_some("ProtoCompaction::frontier")?,
1082 ))
1083 }
1084}
1085
1086impl TryIntoTimelyConfig for StorageCommand {
1087 fn try_into_timely_config(self) -> Result<(TimelyConfig, ClusterStartupEpoch), Self> {
1088 match self {
1089 StorageCommand::CreateTimely { config, epoch } => Ok((*config, epoch)),
1090 cmd => Err(cmd),
1091 }
1092 }
1093}
1094
1095#[cfg(test)]
1096mod tests {
1097 use mz_ore::assert_ok;
1098 use mz_proto::protobuf_roundtrip;
1099 use proptest::prelude::ProptestConfig;
1100 use proptest::proptest;
1101
1102 use super::*;
1103
1104 #[mz_ore::test]
1106 fn test_storage_command_size() {
1107 assert_eq!(std::mem::size_of::<StorageCommand>(), 40);
1108 }
1109
1110 #[mz_ore::test]
1112 fn test_storage_response_size() {
1113 assert_eq!(std::mem::size_of::<StorageResponse>(), 120);
1114 }
1115
1116 proptest! {
1117 #![proptest_config(ProptestConfig::with_cases(32))]
1118
1119 #[mz_ore::test]
1120 #[cfg_attr(miri, ignore)] fn storage_command_protobuf_roundtrip(expect in any::<StorageCommand<mz_repr::Timestamp>>() ) {
1122 let actual = protobuf_roundtrip::<_, ProtoStorageCommand>(&expect);
1123 assert_ok!(actual);
1124 assert_eq!(actual.unwrap(), expect);
1125 }
1126
1127 #[mz_ore::test]
1128 #[cfg_attr(miri, ignore)] fn storage_response_protobuf_roundtrip(expect in any::<StorageResponse<mz_repr::Timestamp>>() ) {
1130 let actual = protobuf_roundtrip::<_, ProtoStorageResponse>(&expect);
1131 assert_ok!(actual);
1132 assert_eq!(actual.unwrap(), expect);
1133 }
1134 }
1135}