mz_storage_client/
client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![allow(missing_docs)]
11// Tonic generates code that violates clippy lints.
12// TODO: Remove this once tonic does not produce this code anymore.
13#![allow(clippy::as_conversions, clippy::clone_on_ref_ptr)]
14
15//! The public API of the storage layer.
16
17use std::collections::{BTreeMap, BTreeSet};
18use std::fmt::Debug;
19use std::iter;
20
21use async_trait::async_trait;
22use differential_dataflow::difference::Semigroup;
23use differential_dataflow::lattice::Lattice;
24use mz_cluster_client::ReplicaId;
25use mz_cluster_client::client::TryIntoProtocolNonce;
26use mz_ore::assert_none;
27use mz_persist_client::batch::{BatchBuilder, ProtoBatch};
28use mz_persist_client::write::WriteHandle;
29use mz_persist_types::{Codec, Codec64, StepForward};
30use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
31use mz_repr::{Diff, GlobalId, Row, TimestampManipulation};
32use mz_service::client::{GenericClient, Partitionable, PartitionedState};
33use mz_service::grpc::{GrpcClient, GrpcServer, ProtoServiceTypes, ResponseStream};
34use mz_storage_types::controller::CollectionMetadata;
35use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
36use mz_storage_types::parameters::StorageParameters;
37use mz_storage_types::sinks::StorageSinkDesc;
38use mz_storage_types::sources::IngestionDescription;
39use mz_timely_util::progress::any_antichain;
40use proptest::prelude::{Arbitrary, any};
41use proptest::strategy::{BoxedStrategy, Strategy, Union};
42use serde::{Deserialize, Serialize};
43use smallvec::SmallVec;
44use timely::PartialOrder;
45use timely::progress::Timestamp;
46use timely::progress::frontier::{Antichain, MutableAntichain};
47use tonic::{Request, Status as TonicStatus, Streaming};
48use uuid::Uuid;
49
50use crate::client::proto_storage_server::ProtoStorage;
51use crate::metrics::ReplicaMetrics;
52use crate::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate};
53
54include!(concat!(env!("OUT_DIR"), "/mz_storage_client.client.rs"));
55
56/// A client to a storage server.
57pub trait StorageClient<T = mz_repr::Timestamp>:
58    GenericClient<StorageCommand<T>, StorageResponse<T>>
59{
60}
61
62impl<C, T> StorageClient<T> for C where C: GenericClient<StorageCommand<T>, StorageResponse<T>> {}
63
64#[async_trait]
65impl<T: Send> GenericClient<StorageCommand<T>, StorageResponse<T>> for Box<dyn StorageClient<T>> {
66    async fn send(&mut self, cmd: StorageCommand<T>) -> Result<(), anyhow::Error> {
67        (**self).send(cmd).await
68    }
69
70    /// # Cancel safety
71    ///
72    /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
73    /// statement and some other branch completes first, it is guaranteed that no messages were
74    /// received by this client.
75    async fn recv(&mut self) -> Result<Option<StorageResponse<T>>, anyhow::Error> {
76        // `GenericClient::recv` is required to be cancel safe.
77        (**self).recv().await
78    }
79}
80
81#[derive(Debug, Clone)]
82pub enum StorageProtoServiceTypes {}
83
84impl ProtoServiceTypes for StorageProtoServiceTypes {
85    type PC = ProtoStorageCommand;
86    type PR = ProtoStorageResponse;
87    type STATS = ReplicaMetrics;
88    const URL: &'static str = "/mz_storage_client.client.ProtoStorage/CommandResponseStream";
89}
90
91pub type StorageGrpcClient = GrpcClient<StorageProtoServiceTypes>;
92
93#[async_trait]
94impl<F, G> ProtoStorage for GrpcServer<F>
95where
96    F: Fn() -> G + Send + Sync + 'static,
97    G: StorageClient + 'static,
98{
99    type CommandResponseStreamStream = ResponseStream<ProtoStorageResponse>;
100
101    async fn command_response_stream(
102        &self,
103        request: Request<Streaming<ProtoStorageCommand>>,
104    ) -> Result<tonic::Response<Self::CommandResponseStreamStream>, TonicStatus> {
105        self.forward_bidi_stream(request).await
106    }
107}
108
109/// Commands related to the ingress and egress of collections.
110#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
111pub enum StorageCommand<T = mz_repr::Timestamp> {
112    /// Transmits connection meta information, before other commands are sent.
113    Hello {
114        nonce: Uuid,
115    },
116    /// Indicates that the controller has sent all commands reflecting its
117    /// initial state.
118    InitializationComplete,
119    /// `AllowWrites` informs the replica that it can transition out of the
120    /// read-only stage and into the read-write computation stage.
121    /// It is now allowed to affect changes to external systems (writes).
122    ///
123    /// See `ComputeCommand::AllowWrites` for details. This command works
124    /// analogously to the compute version.
125    AllowWrites,
126    /// Update storage instance configuration.
127    UpdateConfiguration(Box<StorageParameters>),
128    /// Run the specified ingestion dataflow.
129    RunIngestion(Box<RunIngestionCommand>),
130    /// Enable compaction in storage-managed collections.
131    ///
132    /// A collection id and a frontier after which accumulations must be correct.
133    AllowCompaction(GlobalId, Antichain<T>),
134    RunSink(Box<RunSinkCommand<T>>),
135    /// Run a dataflow which will ingest data from an external source and only __stage__ it in
136    /// Persist.
137    ///
138    /// Unlike regular ingestions/sources, some other component (e.g. `environmentd`) is
139    /// responsible for linking the staged data into a shard.
140    RunOneshotIngestion(Box<RunOneshotIngestion>),
141    /// `CancelOneshotIngestion` instructs the replica to cancel the identified oneshot ingestion.
142    ///
143    /// It is invalid to send a [`CancelOneshotIngestion`] command that references a oneshot
144    /// ingestion that was not created by a corresponding [`RunOneshotIngestion`] command before.
145    /// Doing so may cause the replica to exhibit undefined behavior.
146    ///
147    /// [`CancelOneshotIngestion`]: crate::client::StorageCommand::CancelOneshotIngestion
148    /// [`RunOneshotIngestion`]: crate::client::StorageCommand::RunOneshotIngestion
149    CancelOneshotIngestion(Uuid),
150}
151
152impl<T> StorageCommand<T> {
153    /// Returns whether this command instructs the installation of storage objects.
154    pub fn installs_objects(&self) -> bool {
155        use StorageCommand::*;
156        match self {
157            Hello { .. }
158            | InitializationComplete
159            | AllowWrites
160            | UpdateConfiguration(_)
161            | AllowCompaction(_, _)
162            | CancelOneshotIngestion { .. } => false,
163            // TODO(cf2): multi-replica oneshot ingestions. At the moment returning
164            // true here means we can't run `COPY FROM` on multi-replica clusters, this
165            // should be easy enough to support though.
166            RunIngestion(_) | RunSink(_) | RunOneshotIngestion(_) => true,
167        }
168    }
169}
170
171/// A command that starts ingesting the given ingestion description
172#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
173pub struct RunIngestionCommand {
174    /// The id of the storage collection being ingested.
175    pub id: GlobalId,
176    /// The description of what source type should be ingested and what post-processing steps must
177    /// be applied to the data before writing them down into the storage collection
178    pub description: IngestionDescription<CollectionMetadata>,
179}
180
181impl Arbitrary for RunIngestionCommand {
182    type Strategy = BoxedStrategy<Self>;
183    type Parameters = ();
184
185    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
186        (
187            any::<GlobalId>(),
188            any::<IngestionDescription<CollectionMetadata>>(),
189        )
190            .prop_map(|(id, description)| Self { id, description })
191            .boxed()
192    }
193}
194
195impl RustType<ProtoRunIngestionCommand> for RunIngestionCommand {
196    fn into_proto(&self) -> ProtoRunIngestionCommand {
197        ProtoRunIngestionCommand {
198            id: Some(self.id.into_proto()),
199            description: Some(self.description.into_proto()),
200        }
201    }
202
203    fn from_proto(proto: ProtoRunIngestionCommand) -> Result<Self, TryFromProtoError> {
204        Ok(RunIngestionCommand {
205            id: proto.id.into_rust_if_some("ProtoRunIngestionCommand::id")?,
206            description: proto
207                .description
208                .into_rust_if_some("ProtoRunIngestionCommand::description")?,
209        })
210    }
211}
212
213/// A command that starts ingesting the given ingestion description
214#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
215pub struct RunOneshotIngestion {
216    /// The ID of the ingestion dataflow.
217    pub ingestion_id: uuid::Uuid,
218    /// The ID of collection we'll stage batches for.
219    pub collection_id: GlobalId,
220    /// Metadata for the collection we'll stage batches for.
221    pub collection_meta: CollectionMetadata,
222    /// Details for the oneshot ingestion.
223    pub request: OneshotIngestionRequest,
224}
225
226impl RustType<ProtoRunOneshotIngestion> for RunOneshotIngestion {
227    fn into_proto(&self) -> ProtoRunOneshotIngestion {
228        ProtoRunOneshotIngestion {
229            ingestion_id: Some(self.ingestion_id.into_proto()),
230            collection_id: Some(self.collection_id.into_proto()),
231            storage_metadata: Some(self.collection_meta.into_proto()),
232            request: Some(self.request.into_proto()),
233        }
234    }
235
236    fn from_proto(proto: ProtoRunOneshotIngestion) -> Result<Self, TryFromProtoError> {
237        Ok(RunOneshotIngestion {
238            ingestion_id: proto
239                .ingestion_id
240                .into_rust_if_some("ProtoRunOneshotIngestion::ingestion_id")?,
241            collection_id: proto
242                .collection_id
243                .into_rust_if_some("ProtoRunOneshotIngestion::collection_id")?,
244            collection_meta: proto
245                .storage_metadata
246                .into_rust_if_some("ProtoRunOneshotIngestion::storage_metadata")?,
247            request: proto
248                .request
249                .into_rust_if_some("ProtoRunOneshotIngestion::request")?,
250        })
251    }
252}
253
254impl RustType<ProtoRunSinkCommand> for RunSinkCommand<mz_repr::Timestamp> {
255    fn into_proto(&self) -> ProtoRunSinkCommand {
256        ProtoRunSinkCommand {
257            id: Some(self.id.into_proto()),
258            description: Some(self.description.into_proto()),
259        }
260    }
261
262    fn from_proto(proto: ProtoRunSinkCommand) -> Result<Self, TryFromProtoError> {
263        Ok(RunSinkCommand {
264            id: proto.id.into_rust_if_some("ProtoRunSinkCommand::id")?,
265            description: proto
266                .description
267                .into_rust_if_some("ProtoRunSinkCommand::description")?,
268        })
269    }
270}
271
272/// A command that starts exporting the given sink description
273#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
274pub struct RunSinkCommand<T> {
275    pub id: GlobalId,
276    pub description: StorageSinkDesc<CollectionMetadata, T>,
277}
278
279impl Arbitrary for RunSinkCommand<mz_repr::Timestamp> {
280    type Strategy = BoxedStrategy<Self>;
281    type Parameters = ();
282
283    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
284        (
285            any::<GlobalId>(),
286            any::<StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>>(),
287        )
288            .prop_map(|(id, description)| Self { id, description })
289            .boxed()
290    }
291}
292
293impl RustType<ProtoStorageCommand> for StorageCommand<mz_repr::Timestamp> {
294    fn into_proto(&self) -> ProtoStorageCommand {
295        use proto_storage_command::Kind::*;
296        use proto_storage_command::*;
297        ProtoStorageCommand {
298            kind: Some(match self {
299                StorageCommand::Hello { nonce } => Hello(ProtoHello {
300                    nonce: Some(nonce.into_proto()),
301                }),
302                StorageCommand::InitializationComplete => InitializationComplete(()),
303                StorageCommand::AllowWrites => AllowWrites(()),
304                StorageCommand::UpdateConfiguration(params) => {
305                    UpdateConfiguration(*params.into_proto())
306                }
307                StorageCommand::AllowCompaction(id, frontier) => AllowCompaction(ProtoCompaction {
308                    id: Some(id.into_proto()),
309                    frontier: Some(frontier.into_proto()),
310                }),
311                StorageCommand::RunIngestion(ingestion) => RunIngestion(*ingestion.into_proto()),
312                StorageCommand::RunSink(sink) => RunSink(*sink.into_proto()),
313                StorageCommand::RunOneshotIngestion(ingestion) => {
314                    RunOneshotIngestion(*ingestion.into_proto())
315                }
316                StorageCommand::CancelOneshotIngestion(uuid) => {
317                    CancelOneshotIngestion(uuid.into_proto())
318                }
319            }),
320        }
321    }
322
323    fn from_proto(proto: ProtoStorageCommand) -> Result<Self, TryFromProtoError> {
324        use proto_storage_command::Kind::*;
325        use proto_storage_command::*;
326        match proto.kind {
327            Some(Hello(ProtoHello { nonce })) => Ok(StorageCommand::Hello {
328                nonce: nonce.into_rust_if_some("ProtoHello::nonce")?,
329            }),
330            Some(InitializationComplete(())) => Ok(StorageCommand::InitializationComplete),
331            Some(AllowWrites(())) => Ok(StorageCommand::AllowWrites),
332            Some(UpdateConfiguration(params)) => {
333                let params = Box::new(params.into_rust()?);
334                Ok(StorageCommand::UpdateConfiguration(params))
335            }
336            Some(RunIngestion(ingestion)) => {
337                let ingestion = Box::new(ingestion.into_rust()?);
338                Ok(StorageCommand::RunIngestion(ingestion))
339            }
340            Some(AllowCompaction(ProtoCompaction { id, frontier })) => {
341                Ok(StorageCommand::AllowCompaction(
342                    id.into_rust_if_some("ProtoCompaction::id")?,
343                    frontier.into_rust_if_some("ProtoCompaction::frontier")?,
344                ))
345            }
346            Some(RunSink(sink)) => {
347                let sink = Box::new(sink.into_rust()?);
348                Ok(StorageCommand::RunSink(sink))
349            }
350            Some(RunOneshotIngestion(ingestion)) => {
351                let ingestion = Box::new(ingestion.into_rust()?);
352                Ok(StorageCommand::RunOneshotIngestion(ingestion))
353            }
354            Some(CancelOneshotIngestion(uuid)) => {
355                Ok(StorageCommand::CancelOneshotIngestion(uuid.into_rust()?))
356            }
357            None => Err(TryFromProtoError::missing_field(
358                "ProtoStorageCommand::kind",
359            )),
360        }
361    }
362}
363
364impl Arbitrary for StorageCommand<mz_repr::Timestamp> {
365    type Strategy = Union<BoxedStrategy<Self>>;
366    type Parameters = ();
367
368    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
369        Union::new(vec![
370            any::<u128>()
371                .prop_map(|n| StorageCommand::Hello {
372                    nonce: Uuid::from_u128(n),
373                })
374                .boxed(),
375            any::<RunIngestionCommand>()
376                .prop_map(Box::new)
377                .prop_map(StorageCommand::RunIngestion)
378                .boxed(),
379            any::<RunSinkCommand<mz_repr::Timestamp>>()
380                .prop_map(Box::new)
381                .prop_map(StorageCommand::RunSink)
382                .boxed(),
383            (any::<GlobalId>(), any_antichain())
384                .prop_map(|(id, frontier)| StorageCommand::AllowCompaction(id, frontier))
385                .boxed(),
386        ])
387    }
388}
389
390/// A "kind" enum for statuses tracked by the health operator
391#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
392pub enum Status {
393    Starting,
394    Running,
395    Paused,
396    Stalled,
397    /// This status is currently unused.
398    // re-design the ceased status
399    Ceased,
400    Dropped,
401}
402
403impl std::str::FromStr for Status {
404    type Err = anyhow::Error;
405    /// Keep in sync with [`Status::to_str`].
406    fn from_str(s: &str) -> Result<Self, Self::Err> {
407        Ok(match s {
408            "starting" => Status::Starting,
409            "running" => Status::Running,
410            "paused" => Status::Paused,
411            "stalled" => Status::Stalled,
412            "ceased" => Status::Ceased,
413            "dropped" => Status::Dropped,
414            s => return Err(anyhow::anyhow!("{} is not a valid status", s)),
415        })
416    }
417}
418
419impl Status {
420    /// Keep in sync with `Status::from_str`.
421    pub fn to_str(&self) -> &'static str {
422        match self {
423            Status::Starting => "starting",
424            Status::Running => "running",
425            Status::Paused => "paused",
426            Status::Stalled => "stalled",
427            Status::Ceased => "ceased",
428            Status::Dropped => "dropped",
429        }
430    }
431
432    /// Determines if a new status should be produced in context of a previous
433    /// status.
434    pub fn superseded_by(self, new: Status) -> bool {
435        match (self, new) {
436            (_, Status::Dropped) => true,
437            (Status::Dropped, _) => false,
438            // Don't re-mark that object as paused.
439            (Status::Paused, Status::Paused) => false,
440            // De-duplication of other statuses is currently managed by the
441            // `health_operator`.
442            _ => true,
443        }
444    }
445}
446
447/// A source or sink status update.
448///
449/// Represents a status update for a given object type. The inner value for each
450/// variant should be able to be packed into a status row that conforms to the schema
451/// for the object's status history relation.
452#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
453pub struct StatusUpdate {
454    pub id: GlobalId,
455    pub status: Status,
456    pub timestamp: chrono::DateTime<chrono::Utc>,
457    pub error: Option<String>,
458    pub hints: BTreeSet<String>,
459    pub namespaced_errors: BTreeMap<String, String>,
460    pub replica_id: Option<ReplicaId>,
461}
462
463impl StatusUpdate {
464    pub fn new(
465        id: GlobalId,
466        timestamp: chrono::DateTime<chrono::Utc>,
467        status: Status,
468    ) -> StatusUpdate {
469        StatusUpdate {
470            id,
471            timestamp,
472            status,
473            error: None,
474            hints: Default::default(),
475            namespaced_errors: Default::default(),
476            replica_id: None,
477        }
478    }
479}
480
481impl From<StatusUpdate> for Row {
482    fn from(update: StatusUpdate) -> Self {
483        use mz_repr::Datum;
484
485        let timestamp = Datum::TimestampTz(update.timestamp.try_into().expect("must fit"));
486        let id = update.id.to_string();
487        let id = Datum::String(&id);
488        let status = Datum::String(update.status.to_str());
489        let error = update.error.as_deref().into();
490
491        let mut row = Row::default();
492        let mut packer = row.packer();
493        packer.extend([timestamp, id, status, error]);
494
495        if !update.hints.is_empty() || !update.namespaced_errors.is_empty() {
496            packer.push_dict_with(|dict_packer| {
497                // `hint` and `namespaced` are ordered,
498                // as well as the BTree's they each contain.
499                if !update.hints.is_empty() {
500                    dict_packer.push(Datum::String("hints"));
501                    dict_packer.push_list(update.hints.iter().map(|s| Datum::String(s)));
502                }
503                if !update.namespaced_errors.is_empty() {
504                    dict_packer.push(Datum::String("namespaced"));
505                    dict_packer.push_dict(
506                        update
507                            .namespaced_errors
508                            .iter()
509                            .map(|(k, v)| (k.as_str(), Datum::String(v))),
510                    );
511                }
512            });
513        } else {
514            packer.push(Datum::Null);
515        }
516
517        match update.replica_id {
518            Some(id) => packer.push(Datum::String(&id.to_string())),
519            None => packer.push(Datum::Null),
520        }
521
522        row
523    }
524}
525
526impl RustType<proto_storage_response::ProtoStatus> for Status {
527    fn into_proto(&self) -> proto_storage_response::ProtoStatus {
528        use proto_storage_response::proto_status::*;
529
530        proto_storage_response::ProtoStatus {
531            kind: Some(match self {
532                Status::Starting => Kind::Starting(()),
533                Status::Running => Kind::Running(()),
534                Status::Paused => Kind::Paused(()),
535                Status::Stalled => Kind::Stalled(()),
536                Status::Ceased => Kind::Ceased(()),
537                Status::Dropped => Kind::Dropped(()),
538            }),
539        }
540    }
541
542    fn from_proto(proto: proto_storage_response::ProtoStatus) -> Result<Self, TryFromProtoError> {
543        use proto_storage_response::proto_status::*;
544        let kind = proto
545            .kind
546            .ok_or_else(|| TryFromProtoError::missing_field("ProtoStatus::kind"))?;
547
548        Ok(match kind {
549            Kind::Starting(()) => Status::Starting,
550            Kind::Running(()) => Status::Running,
551            Kind::Paused(()) => Status::Paused,
552            Kind::Stalled(()) => Status::Stalled,
553            Kind::Ceased(()) => Status::Ceased,
554            Kind::Dropped(()) => Status::Dropped,
555        })
556    }
557}
558
559impl RustType<proto_storage_response::ProtoStatusUpdate> for StatusUpdate {
560    fn into_proto(&self) -> proto_storage_response::ProtoStatusUpdate {
561        proto_storage_response::ProtoStatusUpdate {
562            id: Some(self.id.into_proto()),
563            status: Some(self.status.into_proto()),
564            timestamp: Some(self.timestamp.into_proto()),
565            error: self.error.clone(),
566            hints: self.hints.iter().cloned().collect(),
567            namespaced_errors: self.namespaced_errors.clone(),
568            replica_id: self.replica_id.map(|id| id.to_string().into_proto()),
569        }
570    }
571
572    fn from_proto(
573        proto: proto_storage_response::ProtoStatusUpdate,
574    ) -> Result<Self, TryFromProtoError> {
575        Ok(StatusUpdate {
576            id: proto.id.into_rust_if_some("ProtoStatusUpdate::id")?,
577            timestamp: proto
578                .timestamp
579                .into_rust_if_some("ProtoStatusUpdate::timestamp")?,
580            status: proto
581                .status
582                .into_rust_if_some("ProtoStatusUpdate::status")?,
583            error: proto.error,
584            hints: proto.hints.into_iter().collect(),
585            namespaced_errors: proto.namespaced_errors,
586            replica_id: proto
587                .replica_id
588                .map(|replica_id: String| replica_id.parse().expect("must be a valid replica id")),
589        })
590    }
591}
592
593/// An update to an append only collection.
594pub enum AppendOnlyUpdate {
595    Row((Row, Diff)),
596    Status(StatusUpdate),
597}
598
599impl AppendOnlyUpdate {
600    pub fn into_row(self) -> (Row, Diff) {
601        match self {
602            AppendOnlyUpdate::Row((row, diff)) => (row, diff),
603            AppendOnlyUpdate::Status(status) => (Row::from(status), Diff::ONE),
604        }
605    }
606}
607
608impl From<(Row, Diff)> for AppendOnlyUpdate {
609    fn from((row, diff): (Row, Diff)) -> Self {
610        Self::Row((row, diff))
611    }
612}
613
614impl From<StatusUpdate> for AppendOnlyUpdate {
615    fn from(update: StatusUpdate) -> Self {
616        Self::Status(update)
617    }
618}
619
620/// Responses that the storage nature of a worker/dataflow can provide back to the coordinator.
621#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
622pub enum StorageResponse<T = mz_repr::Timestamp> {
623    /// A new upper frontier for the specified identifier.
624    FrontierUpper(GlobalId, Antichain<T>),
625    /// Punctuation indicates that no more responses will be transmitted for the specified id
626    DroppedId(GlobalId),
627    /// Batches that have been staged in Persist and maybe will be linked into a shard.
628    StagedBatches(BTreeMap<uuid::Uuid, Vec<Result<ProtoBatch, String>>>),
629    /// A list of statistics updates, currently only for sources.
630    StatisticsUpdates(Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>),
631    /// A status update for a source or a sink. Periodically sent from
632    /// storage workers to convey the latest status information about an object.
633    StatusUpdate(StatusUpdate),
634}
635
636impl RustType<ProtoStorageResponse> for StorageResponse<mz_repr::Timestamp> {
637    fn into_proto(&self) -> ProtoStorageResponse {
638        use proto_storage_response::Kind::*;
639        use proto_storage_response::{
640            ProtoDroppedId, ProtoFrontierUpper, ProtoStagedBatches, ProtoStatisticsUpdates,
641        };
642        ProtoStorageResponse {
643            kind: Some(match self {
644                StorageResponse::FrontierUpper(id, upper) => FrontierUpper(ProtoFrontierUpper {
645                    id: Some(id.into_proto()),
646                    upper: Some(upper.into_proto()),
647                }),
648                StorageResponse::DroppedId(id) => DroppedId(ProtoDroppedId {
649                    id: Some(id.into_proto()),
650                }),
651                StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
652                    Stats(ProtoStatisticsUpdates {
653                        source_updates: source_stats
654                            .iter()
655                            .map(|update| update.into_proto())
656                            .collect(),
657                        sink_updates: sink_stats
658                            .iter()
659                            .map(|update| update.into_proto())
660                            .collect(),
661                    })
662                }
663                StorageResponse::StatusUpdate(update) => StatusUpdate(update.into_proto()),
664                StorageResponse::StagedBatches(staged) => {
665                    let batches = staged
666                        .into_iter()
667                        .map(|(collection_id, batches)| {
668                            let batches = batches
669                                .into_iter()
670                                .map(|result| {
671                                    use proto_storage_response::proto_staged_batches::batch_result::Value;
672                                    let value = match result {
673                                        Ok(batch) => Value::Batch(batch.clone()),
674                                        Err(err) => Value::Error(err.clone()),
675                                    };
676                                    proto_storage_response::proto_staged_batches::BatchResult { value: Some(value) }
677                                })
678                                .collect();
679                            proto_storage_response::proto_staged_batches::Inner {
680                                id: Some(collection_id.into_proto()),
681                                batches,
682                            }
683                        })
684                        .collect();
685                    StagedBatches(ProtoStagedBatches { batches })
686                }
687            }),
688        }
689    }
690
691    fn from_proto(proto: ProtoStorageResponse) -> Result<Self, TryFromProtoError> {
692        use proto_storage_response::Kind::*;
693        use proto_storage_response::{ProtoDroppedId, ProtoFrontierUpper};
694        match proto.kind {
695            Some(DroppedId(ProtoDroppedId { id })) => Ok(StorageResponse::DroppedId(
696                id.into_rust_if_some("ProtoDroppedId::id")?,
697            )),
698            Some(FrontierUpper(ProtoFrontierUpper { id, upper })) => {
699                Ok(StorageResponse::FrontierUpper(
700                    id.into_rust_if_some("ProtoFrontierUpper::id")?,
701                    upper.into_rust_if_some("ProtoFrontierUpper::upper")?,
702                ))
703            }
704            Some(Stats(stats)) => Ok(StorageResponse::StatisticsUpdates(
705                stats
706                    .source_updates
707                    .into_iter()
708                    .map(|update| update.into_rust())
709                    .collect::<Result<Vec<_>, TryFromProtoError>>()?,
710                stats
711                    .sink_updates
712                    .into_iter()
713                    .map(|update| update.into_rust())
714                    .collect::<Result<Vec<_>, TryFromProtoError>>()?,
715            )),
716            Some(StatusUpdate(update)) => Ok(StorageResponse::StatusUpdate(update.into_rust()?)),
717            Some(StagedBatches(staged)) => {
718                let batches: BTreeMap<_, _> = staged
719                    .batches
720                    .into_iter()
721                    .map(|inner| {
722                        let id = inner
723                            .id
724                            .into_rust_if_some("ProtoStagedBatches::Inner::id")?;
725
726                        let mut batches = Vec::with_capacity(inner.batches.len());
727                        for maybe_batch in inner.batches {
728                            use proto_storage_response::proto_staged_batches::batch_result::Value;
729
730                            let value = maybe_batch.value.ok_or_else(|| {
731                                TryFromProtoError::missing_field("BatchResult::value")
732                            })?;
733                            let batch = match value {
734                                Value::Batch(batch) => Ok(batch),
735                                Value::Error(err) => Err(err),
736                            };
737                            batches.push(batch);
738                        }
739
740                        Ok::<_, TryFromProtoError>((id, batches))
741                    })
742                    .collect::<Result<_, _>>()?;
743
744                Ok(StorageResponse::StagedBatches(batches))
745            }
746            None => Err(TryFromProtoError::missing_field(
747                "ProtoStorageResponse::kind",
748            )),
749        }
750    }
751}
752
753impl Arbitrary for StorageResponse<mz_repr::Timestamp> {
754    type Strategy = Union<BoxedStrategy<Self>>;
755    type Parameters = ();
756
757    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
758        // TODO(guswynn): test `SourceStatisticsUpdates`
759        Union::new(vec![
760            (any::<GlobalId>(), any_antichain())
761                .prop_map(|(id, upper)| StorageResponse::FrontierUpper(id, upper))
762                .boxed(),
763        ])
764    }
765}
766
767/// Maintained state for partitioned storage clients.
768///
769/// This helper type unifies the responses of multiple partitioned
770/// workers in order to present as a single worker.
771#[derive(Debug)]
772pub struct PartitionedStorageState<T> {
773    /// Number of partitions the state machine represents.
774    parts: usize,
775    /// Upper frontiers for sources and sinks, both unioned across all partitions and from each
776    /// individual partition.
777    uppers: BTreeMap<GlobalId, (MutableAntichain<T>, Vec<Option<Antichain<T>>>)>,
778    /// Staged batches from oneshot sources that will get appended by `environmentd`.
779    oneshot_source_responses:
780        BTreeMap<uuid::Uuid, BTreeMap<usize, Vec<Result<ProtoBatch, String>>>>,
781}
782
783impl<T> Partitionable<StorageCommand<T>, StorageResponse<T>>
784    for (StorageCommand<T>, StorageResponse<T>)
785where
786    T: timely::progress::Timestamp + Lattice,
787{
788    type PartitionedState = PartitionedStorageState<T>;
789
790    fn new(parts: usize) -> PartitionedStorageState<T> {
791        PartitionedStorageState {
792            parts,
793            uppers: BTreeMap::new(),
794            oneshot_source_responses: BTreeMap::new(),
795        }
796    }
797}
798
799impl<T> PartitionedStorageState<T>
800where
801    T: timely::progress::Timestamp,
802{
803    fn observe_command(&mut self, command: &StorageCommand<T>) {
804        // Note that `observe_command` is quite different in `mz_compute_client`.
805        // Compute (currently) only sends the command to 1 process,
806        // but storage fans out to all workers, allowing the storage processes
807        // to self-coordinate how commands and internal commands are ordered.
808        //
809        // TODO(guswynn): cluster-unification: consolidate this with compute.
810        let _ = match command {
811            StorageCommand::Hello { .. } => {}
812            StorageCommand::RunIngestion(ingestion) => {
813                self.insert_new_uppers(ingestion.description.collection_ids());
814            }
815            StorageCommand::RunSink(export) => {
816                self.insert_new_uppers([export.id]);
817            }
818            StorageCommand::InitializationComplete
819            | StorageCommand::AllowWrites
820            | StorageCommand::UpdateConfiguration(_)
821            | StorageCommand::AllowCompaction(_, _)
822            | StorageCommand::RunOneshotIngestion(_)
823            | StorageCommand::CancelOneshotIngestion { .. } => {}
824        };
825    }
826
827    /// Shared implementation for commands that install uppers with controllable behavior with
828    /// encountering existing uppers.
829    ///
830    /// If any ID was previously tracked in `self` and `skip_existing` is `false`, we return the ID
831    /// as an error.
832    fn insert_new_uppers<I: IntoIterator<Item = GlobalId>>(&mut self, ids: I) {
833        for id in ids {
834            self.uppers.entry(id).or_insert_with(|| {
835                let mut frontier = MutableAntichain::new();
836                // TODO(guswynn): cluster-unification: fix this dangerous use of `as`, by
837                // merging the types that compute and storage use.
838                #[allow(clippy::as_conversions)]
839                frontier.update_iter(iter::once((T::minimum(), self.parts as i64)));
840                let part_frontiers = vec![Some(Antichain::from_elem(T::minimum())); self.parts];
841
842                (frontier, part_frontiers)
843            });
844        }
845    }
846}
847
848impl<T> PartitionedState<StorageCommand<T>, StorageResponse<T>> for PartitionedStorageState<T>
849where
850    T: timely::progress::Timestamp + Lattice,
851{
852    fn split_command(&mut self, command: StorageCommand<T>) -> Vec<Option<StorageCommand<T>>> {
853        self.observe_command(&command);
854
855        // Fan out to all processes (which will fan out to all workers).
856        // StorageState manages ordering of commands internally.
857        vec![Some(command); self.parts]
858    }
859
860    fn absorb_response(
861        &mut self,
862        shard_id: usize,
863        response: StorageResponse<T>,
864    ) -> Option<Result<StorageResponse<T>, anyhow::Error>> {
865        match response {
866            // Avoid multiple retractions of minimum time, to present as updates from one worker.
867            StorageResponse::FrontierUpper(id, new_shard_upper) => {
868                let (frontier, shard_frontiers) = match self.uppers.get_mut(&id) {
869                    Some(value) => value,
870                    None => panic!("Reference to absent collection: {id}"),
871                };
872                let old_upper = frontier.frontier().to_owned();
873                let shard_upper = match &mut shard_frontiers[shard_id] {
874                    Some(shard_upper) => shard_upper,
875                    None => panic!("Reference to absent shard {shard_id} for collection {id}"),
876                };
877                frontier.update_iter(shard_upper.iter().map(|t| (t.clone(), -1)));
878                frontier.update_iter(new_shard_upper.iter().map(|t| (t.clone(), 1)));
879                shard_upper.join_assign(&new_shard_upper);
880
881                let new_upper = frontier.frontier();
882                if PartialOrder::less_than(&old_upper.borrow(), &new_upper) {
883                    Some(Ok(StorageResponse::FrontierUpper(id, new_upper.to_owned())))
884                } else {
885                    None
886                }
887            }
888            StorageResponse::DroppedId(id) => {
889                let (_, shard_frontiers) = match self.uppers.get_mut(&id) {
890                    Some(value) => value,
891                    None => panic!("Reference to absent collection: {id}"),
892                };
893                let prev = shard_frontiers[shard_id].take();
894                assert!(
895                    prev.is_some(),
896                    "got double drop for {id} from shard {shard_id}"
897                );
898
899                if shard_frontiers.iter().all(Option::is_none) {
900                    self.uppers.remove(&id);
901                    Some(Ok(StorageResponse::DroppedId(id)))
902                } else {
903                    None
904                }
905            }
906            StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
907                // Just forward it along; the `worker_id` should have been set in `storage_state`.
908                // We _could_ consolidate across worker_id's, here, but each worker only produces
909                // responses periodically, so we avoid that complexity.
910                Some(Ok(StorageResponse::StatisticsUpdates(
911                    source_stats,
912                    sink_stats,
913                )))
914            }
915            StorageResponse::StatusUpdate(updates) => {
916                Some(Ok(StorageResponse::StatusUpdate(updates)))
917            }
918            StorageResponse::StagedBatches(batches) => {
919                let mut finished_batches = BTreeMap::new();
920
921                for (collection_id, batches) in batches {
922                    tracing::info!(%shard_id, %collection_id, "got batch");
923
924                    let entry = self
925                        .oneshot_source_responses
926                        .entry(collection_id)
927                        .or_default();
928                    let novel = entry.insert(shard_id, batches);
929                    assert_none!(novel, "Duplicate oneshot source response");
930
931                    // Check if we've received responses from all shards.
932                    if entry.len() == self.parts {
933                        let entry = self
934                            .oneshot_source_responses
935                            .remove(&collection_id)
936                            .expect("checked above");
937                        let all_batches: Vec<_> = entry.into_values().flatten().collect();
938
939                        finished_batches.insert(collection_id, all_batches);
940                    }
941                }
942
943                if !finished_batches.is_empty() {
944                    Some(Ok(StorageResponse::StagedBatches(finished_batches)))
945                } else {
946                    None
947                }
948            }
949        }
950    }
951}
952
953#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
954/// A batch of updates to be fed to a local input
955pub struct Update<T = mz_repr::Timestamp> {
956    pub row: Row,
957    pub timestamp: T,
958    pub diff: Diff,
959}
960
961#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
962/// A batch of updates to be fed to a local input; however, the input must
963/// determine the most appropriate timestamps to use.
964///
965/// TODO(cf2): Can we remove this and use only on [`TableData`].
966pub struct TimestamplessUpdate {
967    pub row: Row,
968    pub diff: Diff,
969}
970
971#[derive(Debug, Clone, PartialEq)]
972pub enum TableData {
973    /// Rows that still need to be persisted and appended.
974    ///
975    /// The contained [`Row`]s are _not_ consolidated.
976    Rows(Vec<(Row, Diff)>),
977    /// Batches already staged in Persist ready to be appended.
978    Batches(SmallVec<[ProtoBatch; 1]>),
979}
980
981impl TableData {
982    pub fn is_empty(&self) -> bool {
983        match self {
984            TableData::Rows(rows) => rows.is_empty(),
985            TableData::Batches(batches) => batches.is_empty(),
986        }
987    }
988}
989
990/// A collection of timestamp-less updates. As updates are added to the builder
991/// they are automatically spilled to blob storage.
992pub struct TimestamplessUpdateBuilder<K, V, T, D>
993where
994    K: Codec,
995    V: Codec,
996    T: Timestamp + Lattice + Codec64,
997    D: Codec64,
998{
999    builder: BatchBuilder<K, V, T, D>,
1000    initial_ts: T,
1001}
1002
1003impl<K, V, T, D> TimestamplessUpdateBuilder<K, V, T, D>
1004where
1005    K: Debug + Codec,
1006    V: Debug + Codec,
1007    T: TimestampManipulation + Lattice + Codec64 + Sync,
1008    D: Semigroup + Ord + Codec64 + Send + Sync,
1009{
1010    /// Create a new [`TimestamplessUpdateBuilder`] for the shard associated
1011    /// with the provided [`WriteHandle`].
1012    pub fn new(handle: &WriteHandle<K, V, T, D>) -> Self {
1013        let initial_ts = T::minimum();
1014        let builder = handle.builder(Antichain::from_elem(initial_ts.clone()));
1015        TimestamplessUpdateBuilder {
1016            builder,
1017            initial_ts,
1018        }
1019    }
1020
1021    /// Add a `(K, V, D)` to the staged batch.
1022    pub async fn add(&mut self, k: &K, v: &V, d: &D) {
1023        self.builder
1024            .add(k, v, &self.initial_ts, d)
1025            .await
1026            .expect("invalid Persist usage");
1027    }
1028
1029    /// Finish the builder and return a [`ProtoBatch`] which can later be linked into a shard.
1030    ///
1031    /// The returned batch has nonsensical lower and upper bounds and must be re-written before
1032    /// appending into the destination shard.
1033    pub async fn finish(self) -> ProtoBatch {
1034        let finish_ts = StepForward::step_forward(&self.initial_ts);
1035        let batch = self
1036            .builder
1037            .finish(Antichain::from_elem(finish_ts))
1038            .await
1039            .expect("invalid Persist usage");
1040
1041        batch.into_transmittable_batch()
1042    }
1043}
1044
1045impl RustType<ProtoCompaction> for (GlobalId, Antichain<mz_repr::Timestamp>) {
1046    fn into_proto(&self) -> ProtoCompaction {
1047        ProtoCompaction {
1048            id: Some(self.0.into_proto()),
1049            frontier: Some(self.1.into_proto()),
1050        }
1051    }
1052
1053    fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1054        Ok((
1055            proto.id.into_rust_if_some("ProtoCompaction::id")?,
1056            proto
1057                .frontier
1058                .into_rust_if_some("ProtoCompaction::frontier")?,
1059        ))
1060    }
1061}
1062
1063impl TryIntoProtocolNonce for StorageCommand {
1064    fn try_into_protocol_nonce(self) -> Result<Uuid, Self> {
1065        match self {
1066            StorageCommand::Hello { nonce } => Ok(nonce),
1067            cmd => Err(cmd),
1068        }
1069    }
1070}
1071
1072#[cfg(test)]
1073mod tests {
1074    use mz_ore::assert_ok;
1075    use mz_proto::protobuf_roundtrip;
1076    use proptest::prelude::ProptestConfig;
1077    use proptest::proptest;
1078
1079    use super::*;
1080
1081    /// Test to ensure the size of the `StorageCommand` enum doesn't regress.
1082    #[mz_ore::test]
1083    fn test_storage_command_size() {
1084        assert_eq!(std::mem::size_of::<StorageCommand>(), 40);
1085    }
1086
1087    /// Test to ensure the size of the `StorageResponse` enum doesn't regress.
1088    #[mz_ore::test]
1089    fn test_storage_response_size() {
1090        assert_eq!(std::mem::size_of::<StorageResponse>(), 120);
1091    }
1092
1093    proptest! {
1094        #![proptest_config(ProptestConfig::with_cases(32))]
1095
1096        #[mz_ore::test]
1097        #[cfg_attr(miri, ignore)] // too slow
1098        fn storage_command_protobuf_roundtrip(expect in any::<StorageCommand<mz_repr::Timestamp>>() ) {
1099            let actual = protobuf_roundtrip::<_, ProtoStorageCommand>(&expect);
1100            assert_ok!(actual);
1101            assert_eq!(actual.unwrap(), expect);
1102        }
1103
1104        #[mz_ore::test]
1105        #[cfg_attr(miri, ignore)] // too slow
1106        fn storage_response_protobuf_roundtrip(expect in any::<StorageResponse<mz_repr::Timestamp>>() ) {
1107            let actual = protobuf_roundtrip::<_, ProtoStorageResponse>(&expect);
1108            assert_ok!(actual);
1109            assert_eq!(actual.unwrap(), expect);
1110        }
1111    }
1112}