mz_storage_client/
client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![allow(missing_docs)]
11// Tonic generates code that violates clippy lints.
12// TODO: Remove this once tonic does not produce this code anymore.
13#![allow(clippy::as_conversions, clippy::clone_on_ref_ptr)]
14
15//! The public API of the storage layer.
16
17use std::collections::{BTreeMap, BTreeSet};
18use std::fmt::Debug;
19use std::iter;
20
21use async_trait::async_trait;
22use differential_dataflow::difference::Semigroup;
23use differential_dataflow::lattice::Lattice;
24use mz_cluster_client::ReplicaId;
25use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig, TryIntoTimelyConfig};
26use mz_ore::assert_none;
27use mz_persist_client::batch::{BatchBuilder, ProtoBatch};
28use mz_persist_client::write::WriteHandle;
29use mz_persist_types::{Codec, Codec64, StepForward};
30use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
31use mz_repr::{Diff, GlobalId, Row, TimestampManipulation};
32use mz_service::client::{GenericClient, Partitionable, PartitionedState};
33use mz_service::grpc::{GrpcClient, GrpcServer, ProtoServiceTypes, ResponseStream};
34use mz_storage_types::controller::CollectionMetadata;
35use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
36use mz_storage_types::parameters::StorageParameters;
37use mz_storage_types::sinks::StorageSinkDesc;
38use mz_storage_types::sources::IngestionDescription;
39use mz_timely_util::progress::any_antichain;
40use proptest::prelude::{Arbitrary, any};
41use proptest::strategy::{BoxedStrategy, Strategy, Union};
42use serde::{Deserialize, Serialize};
43use smallvec::SmallVec;
44use timely::PartialOrder;
45use timely::progress::Timestamp;
46use timely::progress::frontier::{Antichain, MutableAntichain};
47use tonic::{Request, Status as TonicStatus, Streaming};
48use uuid::Uuid;
49
50use crate::client::proto_storage_server::ProtoStorage;
51use crate::metrics::ReplicaMetrics;
52use crate::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate};
53
54include!(concat!(env!("OUT_DIR"), "/mz_storage_client.client.rs"));
55
56/// A client to a storage server.
57pub trait StorageClient<T = mz_repr::Timestamp>:
58    GenericClient<StorageCommand<T>, StorageResponse<T>>
59{
60}
61
62impl<C, T> StorageClient<T> for C where C: GenericClient<StorageCommand<T>, StorageResponse<T>> {}
63
64#[async_trait]
65impl<T: Send> GenericClient<StorageCommand<T>, StorageResponse<T>> for Box<dyn StorageClient<T>> {
66    async fn send(&mut self, cmd: StorageCommand<T>) -> Result<(), anyhow::Error> {
67        (**self).send(cmd).await
68    }
69
70    /// # Cancel safety
71    ///
72    /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
73    /// statement and some other branch completes first, it is guaranteed that no messages were
74    /// received by this client.
75    async fn recv(&mut self) -> Result<Option<StorageResponse<T>>, anyhow::Error> {
76        // `GenericClient::recv` is required to be cancel safe.
77        (**self).recv().await
78    }
79}
80
81#[derive(Debug, Clone)]
82pub enum StorageProtoServiceTypes {}
83
84impl ProtoServiceTypes for StorageProtoServiceTypes {
85    type PC = ProtoStorageCommand;
86    type PR = ProtoStorageResponse;
87    type STATS = ReplicaMetrics;
88    const URL: &'static str = "/mz_storage_client.client.ProtoStorage/CommandResponseStream";
89}
90
91pub type StorageGrpcClient = GrpcClient<StorageProtoServiceTypes>;
92
93#[async_trait]
94impl<F, G> ProtoStorage for GrpcServer<F>
95where
96    F: Fn() -> G + Send + Sync + 'static,
97    G: StorageClient + 'static,
98{
99    type CommandResponseStreamStream = ResponseStream<ProtoStorageResponse>;
100
101    async fn command_response_stream(
102        &self,
103        request: Request<Streaming<ProtoStorageCommand>>,
104    ) -> Result<tonic::Response<Self::CommandResponseStreamStream>, TonicStatus> {
105        self.forward_bidi_stream(request).await
106    }
107}
108
109/// Commands related to the ingress and egress of collections.
110#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
111pub enum StorageCommand<T = mz_repr::Timestamp> {
112    /// Specifies to the storage server(s) the shape of the timely cluster
113    /// we want created, before other commands are sent.
114    CreateTimely {
115        config: TimelyConfig,
116        epoch: ClusterStartupEpoch,
117    },
118    /// Indicates that the controller has sent all commands reflecting its
119    /// initial state.
120    InitializationComplete,
121    /// `AllowWrites` informs the replica that it can transition out of the
122    /// read-only stage and into the read-write computation stage.
123    /// It is now allowed to affect changes to external systems (writes).
124    ///
125    /// See `ComputeCommand::AllowWrites` for details. This command works
126    /// analogously to the compute version.
127    AllowWrites,
128    /// Update storage instance configuration.
129    UpdateConfiguration(StorageParameters),
130    /// Run the specified ingestion dataflow.
131    RunIngestion(RunIngestionCommand),
132    /// Enable compaction in storage-managed collections.
133    ///
134    /// A collection id and a frontier after which accumulations must be correct.
135    AllowCompaction(GlobalId, Antichain<T>),
136    RunSink(RunSinkCommand<T>),
137    /// Run a dataflow which will ingest data from an external source and only __stage__ it in
138    /// Persist.
139    ///
140    /// Unlike regular ingestions/sources, some other component (e.g. `environmentd`) is
141    /// responsible for linking the staged data into a shard.
142    RunOneshotIngestion(Vec<RunOneshotIngestion>),
143    /// `CancelOneshotIngestion` instructs the replica to cancel the identified oneshot ingestions.
144    ///
145    /// It is invalid to send a [`CancelOneshotIngestion`] command that references a oneshot
146    /// ingestion that was not created by a corresponding [`RunOneshotIngestion`] command before.
147    /// Doing so may cause the replica to exhibit undefined behavior.
148    ///
149    /// [`CancelOneshotIngestion`]: crate::client::StorageCommand::CancelOneshotIngestion
150    /// [`RunOneshotIngestion`]: crate::client::StorageCommand::RunOneshotIngestion
151    CancelOneshotIngestion {
152        ingestions: Vec<Uuid>,
153    },
154}
155
156impl<T> StorageCommand<T> {
157    /// Returns whether this command instructs the installation of storage objects.
158    pub fn installs_objects(&self) -> bool {
159        use StorageCommand::*;
160        match self {
161            CreateTimely { .. }
162            | InitializationComplete
163            | AllowWrites
164            | UpdateConfiguration(_)
165            | AllowCompaction(_, _)
166            | CancelOneshotIngestion { .. } => false,
167            // TODO(cf2): multi-replica oneshot ingestions. At the moment returning
168            // true here means we can't run `COPY FROM` on multi-replica clusters, this
169            // should be easy enough to support though.
170            RunIngestion(_) | RunSink(_) | RunOneshotIngestion(_) => true,
171        }
172    }
173}
174
175/// A command that starts ingesting the given ingestion description
176#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
177pub struct RunIngestionCommand {
178    /// The id of the storage collection being ingested.
179    pub id: GlobalId,
180    /// The description of what source type should be ingested and what post-processing steps must
181    /// be applied to the data before writing them down into the storage collection
182    pub description: IngestionDescription<CollectionMetadata>,
183}
184
185impl Arbitrary for RunIngestionCommand {
186    type Strategy = BoxedStrategy<Self>;
187    type Parameters = ();
188
189    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
190        (
191            any::<GlobalId>(),
192            any::<IngestionDescription<CollectionMetadata>>(),
193        )
194            .prop_map(|(id, description)| Self { id, description })
195            .boxed()
196    }
197}
198
199impl RustType<ProtoRunIngestionCommand> for RunIngestionCommand {
200    fn into_proto(&self) -> ProtoRunIngestionCommand {
201        ProtoRunIngestionCommand {
202            id: Some(self.id.into_proto()),
203            description: Some(self.description.into_proto()),
204        }
205    }
206
207    fn from_proto(proto: ProtoRunIngestionCommand) -> Result<Self, TryFromProtoError> {
208        Ok(RunIngestionCommand {
209            id: proto.id.into_rust_if_some("ProtoRunIngestionCommand::id")?,
210            description: proto
211                .description
212                .into_rust_if_some("ProtoRunIngestionCommand::description")?,
213        })
214    }
215}
216
217/// A command that starts ingesting the given ingestion description
218#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
219pub struct RunOneshotIngestion {
220    /// The ID of the ingestion dataflow.
221    pub ingestion_id: uuid::Uuid,
222    /// The ID of collection we'll stage batches for.
223    pub collection_id: GlobalId,
224    /// Metadata for the collection we'll stage batches for.
225    pub collection_meta: CollectionMetadata,
226    /// Details for the oneshot ingestion.
227    pub request: OneshotIngestionRequest,
228}
229
230impl RustType<ProtoRunOneshotIngestion> for RunOneshotIngestion {
231    fn into_proto(&self) -> ProtoRunOneshotIngestion {
232        ProtoRunOneshotIngestion {
233            ingestion_id: Some(self.ingestion_id.into_proto()),
234            collection_id: Some(self.collection_id.into_proto()),
235            storage_metadata: Some(self.collection_meta.into_proto()),
236            request: Some(self.request.into_proto()),
237        }
238    }
239
240    fn from_proto(proto: ProtoRunOneshotIngestion) -> Result<Self, TryFromProtoError> {
241        Ok(RunOneshotIngestion {
242            ingestion_id: proto
243                .ingestion_id
244                .into_rust_if_some("ProtoRunOneshotIngestion::ingestion_id")?,
245            collection_id: proto
246                .collection_id
247                .into_rust_if_some("ProtoRunOneshotIngestion::collection_id")?,
248            collection_meta: proto
249                .storage_metadata
250                .into_rust_if_some("ProtoRunOneshotIngestion::storage_metadata")?,
251            request: proto
252                .request
253                .into_rust_if_some("ProtoRunOneshotIngestion::request")?,
254        })
255    }
256}
257
258impl RustType<ProtoRunSinkCommand> for RunSinkCommand<mz_repr::Timestamp> {
259    fn into_proto(&self) -> ProtoRunSinkCommand {
260        ProtoRunSinkCommand {
261            id: Some(self.id.into_proto()),
262            description: Some(self.description.into_proto()),
263        }
264    }
265
266    fn from_proto(proto: ProtoRunSinkCommand) -> Result<Self, TryFromProtoError> {
267        Ok(RunSinkCommand {
268            id: proto.id.into_rust_if_some("ProtoRunSinkCommand::id")?,
269            description: proto
270                .description
271                .into_rust_if_some("ProtoRunSinkCommand::description")?,
272        })
273    }
274}
275
276/// A command that starts exporting the given sink description
277#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
278pub struct RunSinkCommand<T> {
279    pub id: GlobalId,
280    pub description: StorageSinkDesc<CollectionMetadata, T>,
281}
282
283impl Arbitrary for RunSinkCommand<mz_repr::Timestamp> {
284    type Strategy = BoxedStrategy<Self>;
285    type Parameters = ();
286
287    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
288        (
289            any::<GlobalId>(),
290            any::<StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>>(),
291        )
292            .prop_map(|(id, description)| Self { id, description })
293            .boxed()
294    }
295}
296
297impl RustType<ProtoStorageCommand> for StorageCommand<mz_repr::Timestamp> {
298    fn into_proto(&self) -> ProtoStorageCommand {
299        use proto_storage_command::Kind::*;
300        use proto_storage_command::*;
301        ProtoStorageCommand {
302            kind: Some(match self {
303                StorageCommand::CreateTimely { config, epoch } => CreateTimely(ProtoCreateTimely {
304                    config: Some(config.into_proto()),
305                    epoch: Some(epoch.into_proto()),
306                }),
307                StorageCommand::InitializationComplete => InitializationComplete(()),
308                StorageCommand::AllowWrites => AllowWrites(()),
309                StorageCommand::UpdateConfiguration(params) => {
310                    UpdateConfiguration(params.into_proto())
311                }
312                StorageCommand::AllowCompaction(id, frontier) => AllowCompaction(ProtoCompaction {
313                    id: Some(id.into_proto()),
314                    frontier: Some(frontier.into_proto()),
315                }),
316                StorageCommand::RunIngestion(ingestion) => RunIngestion(ingestion.into_proto()),
317                StorageCommand::RunSink(sink) => RunSink(sink.into_proto()),
318                StorageCommand::RunOneshotIngestion(ingestions) => {
319                    RunOneshotIngestions(ProtoRunOneshotIngestionsCommand {
320                        ingestions: ingestions.iter().map(|cmd| cmd.into_proto()).collect(),
321                    })
322                }
323                StorageCommand::CancelOneshotIngestion { ingestions } => {
324                    CancelOneshotIngestions(ProtoCancelOneshotIngestionsCommand {
325                        ingestions: ingestions.iter().map(|uuid| uuid.into_proto()).collect(),
326                    })
327                }
328            }),
329        }
330    }
331
332    fn from_proto(proto: ProtoStorageCommand) -> Result<Self, TryFromProtoError> {
333        use proto_storage_command::Kind::*;
334        use proto_storage_command::*;
335        match proto.kind {
336            Some(CreateTimely(ProtoCreateTimely { config, epoch })) => {
337                Ok(StorageCommand::CreateTimely {
338                    config: config.into_rust_if_some("ProtoCreateTimely::config")?,
339                    epoch: epoch.into_rust_if_some("ProtoCreateTimely::epoch")?,
340                })
341            }
342            Some(InitializationComplete(())) => Ok(StorageCommand::InitializationComplete),
343            Some(AllowWrites(())) => Ok(StorageCommand::AllowWrites),
344            Some(UpdateConfiguration(params)) => {
345                Ok(StorageCommand::UpdateConfiguration(params.into_rust()?))
346            }
347            Some(RunIngestion(ingestion)) => {
348                Ok(StorageCommand::RunIngestion(ingestion.into_rust()?))
349            }
350            Some(AllowCompaction(ProtoCompaction { id, frontier })) => {
351                Ok(StorageCommand::AllowCompaction(
352                    id.into_rust_if_some("ProtoCompaction::id")?,
353                    frontier.into_rust_if_some("ProtoCompaction::frontier")?,
354                ))
355            }
356            Some(RunSink(sink)) => Ok(StorageCommand::RunSink(sink.into_rust()?)),
357            Some(RunOneshotIngestions(oneshot)) => {
358                let ingestions = oneshot
359                    .ingestions
360                    .into_iter()
361                    .map(|cmd| cmd.into_rust())
362                    .collect::<Result<_, _>>()?;
363                Ok(StorageCommand::RunOneshotIngestion(ingestions))
364            }
365            Some(CancelOneshotIngestions(oneshot)) => {
366                let ingestions = oneshot
367                    .ingestions
368                    .into_iter()
369                    .map(|uuid| uuid.into_rust())
370                    .collect::<Result<_, _>>()?;
371                Ok(StorageCommand::CancelOneshotIngestion { ingestions })
372            }
373            None => Err(TryFromProtoError::missing_field(
374                "ProtoStorageCommand::kind",
375            )),
376        }
377    }
378}
379
380impl Arbitrary for StorageCommand<mz_repr::Timestamp> {
381    type Strategy = Union<BoxedStrategy<Self>>;
382    type Parameters = ();
383
384    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
385        Union::new(vec![
386            // TODO(guswynn): cluster-unification: also test `CreateTimely` here.
387            any::<RunIngestionCommand>()
388                .prop_map(StorageCommand::RunIngestion)
389                .boxed(),
390            any::<RunSinkCommand<mz_repr::Timestamp>>()
391                .prop_map(StorageCommand::RunSink)
392                .boxed(),
393            (any::<GlobalId>(), any_antichain())
394                .prop_map(|(id, frontier)| StorageCommand::AllowCompaction(id, frontier))
395                .boxed(),
396        ])
397    }
398}
399
400/// A "kind" enum for statuses tracked by the health operator
401#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
402pub enum Status {
403    Starting,
404    Running,
405    Paused,
406    Stalled,
407    /// This status is currently unused.
408    // re-design the ceased status
409    Ceased,
410    Dropped,
411}
412
413impl std::str::FromStr for Status {
414    type Err = anyhow::Error;
415    /// Keep in sync with [`Status::to_str`].
416    fn from_str(s: &str) -> Result<Self, Self::Err> {
417        Ok(match s {
418            "starting" => Status::Starting,
419            "running" => Status::Running,
420            "paused" => Status::Paused,
421            "stalled" => Status::Stalled,
422            "ceased" => Status::Ceased,
423            "dropped" => Status::Dropped,
424            s => return Err(anyhow::anyhow!("{} is not a valid status", s)),
425        })
426    }
427}
428
429impl Status {
430    /// Keep in sync with `Status::from_str`.
431    pub fn to_str(&self) -> &'static str {
432        match self {
433            Status::Starting => "starting",
434            Status::Running => "running",
435            Status::Paused => "paused",
436            Status::Stalled => "stalled",
437            Status::Ceased => "ceased",
438            Status::Dropped => "dropped",
439        }
440    }
441
442    /// Determines if a new status should be produced in context of a previous
443    /// status.
444    pub fn superseded_by(self, new: Status) -> bool {
445        match (self, new) {
446            (_, Status::Dropped) => true,
447            (Status::Dropped, _) => false,
448            // Don't re-mark that object as paused.
449            (Status::Paused, Status::Paused) => false,
450            // De-duplication of other statuses is currently managed by the
451            // `health_operator`.
452            _ => true,
453        }
454    }
455}
456
457/// A source or sink status update.
458///
459/// Represents a status update for a given object type. The inner value for each
460/// variant should be able to be packed into a status row that conforms to the schema
461/// for the object's status history relation.
462#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
463pub struct StatusUpdate {
464    pub id: GlobalId,
465    pub status: Status,
466    pub timestamp: chrono::DateTime<chrono::Utc>,
467    pub error: Option<String>,
468    pub hints: BTreeSet<String>,
469    pub namespaced_errors: BTreeMap<String, String>,
470    pub replica_id: Option<ReplicaId>,
471}
472
473impl StatusUpdate {
474    pub fn new(
475        id: GlobalId,
476        timestamp: chrono::DateTime<chrono::Utc>,
477        status: Status,
478    ) -> StatusUpdate {
479        StatusUpdate {
480            id,
481            timestamp,
482            status,
483            error: None,
484            hints: Default::default(),
485            namespaced_errors: Default::default(),
486            replica_id: None,
487        }
488    }
489}
490
491impl From<StatusUpdate> for Row {
492    fn from(update: StatusUpdate) -> Self {
493        use mz_repr::Datum;
494
495        let timestamp = Datum::TimestampTz(update.timestamp.try_into().expect("must fit"));
496        let id = update.id.to_string();
497        let id = Datum::String(&id);
498        let status = Datum::String(update.status.to_str());
499        let error = update.error.as_deref().into();
500
501        let mut row = Row::default();
502        let mut packer = row.packer();
503        packer.extend([timestamp, id, status, error]);
504
505        if !update.hints.is_empty() || !update.namespaced_errors.is_empty() {
506            packer.push_dict_with(|dict_packer| {
507                // `hint` and `namespaced` are ordered,
508                // as well as the BTree's they each contain.
509                if !update.hints.is_empty() {
510                    dict_packer.push(Datum::String("hints"));
511                    dict_packer.push_list(update.hints.iter().map(|s| Datum::String(s)));
512                }
513                if !update.namespaced_errors.is_empty() {
514                    dict_packer.push(Datum::String("namespaced"));
515                    dict_packer.push_dict(
516                        update
517                            .namespaced_errors
518                            .iter()
519                            .map(|(k, v)| (k.as_str(), Datum::String(v))),
520                    );
521                }
522            });
523        } else {
524            packer.push(Datum::Null);
525        }
526
527        match update.replica_id {
528            Some(id) => packer.push(Datum::String(&id.to_string())),
529            None => packer.push(Datum::Null),
530        }
531
532        row
533    }
534}
535
536impl RustType<proto_storage_response::ProtoStatus> for Status {
537    fn into_proto(&self) -> proto_storage_response::ProtoStatus {
538        use proto_storage_response::proto_status::*;
539
540        proto_storage_response::ProtoStatus {
541            kind: Some(match self {
542                Status::Starting => Kind::Starting(()),
543                Status::Running => Kind::Running(()),
544                Status::Paused => Kind::Paused(()),
545                Status::Stalled => Kind::Stalled(()),
546                Status::Ceased => Kind::Ceased(()),
547                Status::Dropped => Kind::Dropped(()),
548            }),
549        }
550    }
551
552    fn from_proto(proto: proto_storage_response::ProtoStatus) -> Result<Self, TryFromProtoError> {
553        use proto_storage_response::proto_status::*;
554        let kind = proto
555            .kind
556            .ok_or_else(|| TryFromProtoError::missing_field("ProtoStatus::kind"))?;
557
558        Ok(match kind {
559            Kind::Starting(()) => Status::Starting,
560            Kind::Running(()) => Status::Running,
561            Kind::Paused(()) => Status::Paused,
562            Kind::Stalled(()) => Status::Stalled,
563            Kind::Ceased(()) => Status::Ceased,
564            Kind::Dropped(()) => Status::Dropped,
565        })
566    }
567}
568
569impl RustType<proto_storage_response::ProtoStatusUpdate> for StatusUpdate {
570    fn into_proto(&self) -> proto_storage_response::ProtoStatusUpdate {
571        proto_storage_response::ProtoStatusUpdate {
572            id: Some(self.id.into_proto()),
573            status: Some(self.status.into_proto()),
574            timestamp: Some(self.timestamp.into_proto()),
575            error: self.error.clone(),
576            hints: self.hints.iter().cloned().collect(),
577            namespaced_errors: self.namespaced_errors.clone(),
578            replica_id: self.replica_id.map(|id| id.to_string().into_proto()),
579        }
580    }
581
582    fn from_proto(
583        proto: proto_storage_response::ProtoStatusUpdate,
584    ) -> Result<Self, TryFromProtoError> {
585        Ok(StatusUpdate {
586            id: proto.id.into_rust_if_some("ProtoStatusUpdate::id")?,
587            timestamp: proto
588                .timestamp
589                .into_rust_if_some("ProtoStatusUpdate::timestamp")?,
590            status: proto
591                .status
592                .into_rust_if_some("ProtoStatusUpdate::status")?,
593            error: proto.error,
594            hints: proto.hints.into_iter().collect(),
595            namespaced_errors: proto.namespaced_errors,
596            replica_id: proto
597                .replica_id
598                .map(|replica_id: String| replica_id.parse().expect("must be a valid replica id")),
599        })
600    }
601}
602
603/// An update to an append only collection.
604pub enum AppendOnlyUpdate {
605    Row((Row, Diff)),
606    Status(StatusUpdate),
607}
608
609impl AppendOnlyUpdate {
610    pub fn into_row(self) -> (Row, Diff) {
611        match self {
612            AppendOnlyUpdate::Row((row, diff)) => (row, diff),
613            AppendOnlyUpdate::Status(status) => (Row::from(status), Diff::ONE),
614        }
615    }
616}
617
618impl From<(Row, Diff)> for AppendOnlyUpdate {
619    fn from((row, diff): (Row, Diff)) -> Self {
620        Self::Row((row, diff))
621    }
622}
623
624impl From<StatusUpdate> for AppendOnlyUpdate {
625    fn from(update: StatusUpdate) -> Self {
626        Self::Status(update)
627    }
628}
629
630/// Responses that the storage nature of a worker/dataflow can provide back to the coordinator.
631#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
632pub enum StorageResponse<T = mz_repr::Timestamp> {
633    /// A new upper frontier for the specified identifier.
634    FrontierUpper(GlobalId, Antichain<T>),
635    /// Punctuation indicates that no more responses will be transmitted for the specified id
636    DroppedId(GlobalId),
637    /// Batches that have been staged in Persist and maybe will be linked into a shard.
638    StagedBatches(BTreeMap<uuid::Uuid, Vec<Result<ProtoBatch, String>>>),
639    /// A list of statistics updates, currently only for sources.
640    StatisticsUpdates(Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>),
641    /// A status update for a source or a sink. Periodically sent from
642    /// storage workers to convey the latest status information about an object.
643    StatusUpdate(StatusUpdate),
644}
645
646impl RustType<ProtoStorageResponse> for StorageResponse<mz_repr::Timestamp> {
647    fn into_proto(&self) -> ProtoStorageResponse {
648        use proto_storage_response::Kind::*;
649        use proto_storage_response::{
650            ProtoDroppedId, ProtoFrontierUpper, ProtoStagedBatches, ProtoStatisticsUpdates,
651        };
652        ProtoStorageResponse {
653            kind: Some(match self {
654                StorageResponse::FrontierUpper(id, upper) => FrontierUpper(ProtoFrontierUpper {
655                    id: Some(id.into_proto()),
656                    upper: Some(upper.into_proto()),
657                }),
658                StorageResponse::DroppedId(id) => DroppedId(ProtoDroppedId {
659                    id: Some(id.into_proto()),
660                }),
661                StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
662                    Stats(ProtoStatisticsUpdates {
663                        source_updates: source_stats
664                            .iter()
665                            .map(|update| update.into_proto())
666                            .collect(),
667                        sink_updates: sink_stats
668                            .iter()
669                            .map(|update| update.into_proto())
670                            .collect(),
671                    })
672                }
673                StorageResponse::StatusUpdate(update) => StatusUpdate(update.into_proto()),
674                StorageResponse::StagedBatches(staged) => {
675                    let batches = staged
676                        .into_iter()
677                        .map(|(collection_id, batches)| {
678                            let batches = batches
679                                .into_iter()
680                                .map(|result| {
681                                    use proto_storage_response::proto_staged_batches::batch_result::Value;
682                                    let value = match result {
683                                        Ok(batch) => Value::Batch(batch.clone()),
684                                        Err(err) => Value::Error(err.clone()),
685                                    };
686                                    proto_storage_response::proto_staged_batches::BatchResult { value: Some(value) }
687                                })
688                                .collect();
689                            proto_storage_response::proto_staged_batches::Inner {
690                                id: Some(collection_id.into_proto()),
691                                batches,
692                            }
693                        })
694                        .collect();
695                    StagedBatches(ProtoStagedBatches { batches })
696                }
697            }),
698        }
699    }
700
701    fn from_proto(proto: ProtoStorageResponse) -> Result<Self, TryFromProtoError> {
702        use proto_storage_response::Kind::*;
703        use proto_storage_response::{ProtoDroppedId, ProtoFrontierUpper};
704        match proto.kind {
705            Some(DroppedId(ProtoDroppedId { id })) => Ok(StorageResponse::DroppedId(
706                id.into_rust_if_some("ProtoDroppedId::id")?,
707            )),
708            Some(FrontierUpper(ProtoFrontierUpper { id, upper })) => {
709                Ok(StorageResponse::FrontierUpper(
710                    id.into_rust_if_some("ProtoFrontierUpper::id")?,
711                    upper.into_rust_if_some("ProtoFrontierUpper::upper")?,
712                ))
713            }
714            Some(Stats(stats)) => Ok(StorageResponse::StatisticsUpdates(
715                stats
716                    .source_updates
717                    .into_iter()
718                    .map(|update| update.into_rust())
719                    .collect::<Result<Vec<_>, TryFromProtoError>>()?,
720                stats
721                    .sink_updates
722                    .into_iter()
723                    .map(|update| update.into_rust())
724                    .collect::<Result<Vec<_>, TryFromProtoError>>()?,
725            )),
726            Some(StatusUpdate(update)) => Ok(StorageResponse::StatusUpdate(update.into_rust()?)),
727            Some(StagedBatches(staged)) => {
728                let batches: BTreeMap<_, _> = staged
729                    .batches
730                    .into_iter()
731                    .map(|inner| {
732                        let id = inner
733                            .id
734                            .into_rust_if_some("ProtoStagedBatches::Inner::id")?;
735
736                        let mut batches = Vec::with_capacity(inner.batches.len());
737                        for maybe_batch in inner.batches {
738                            use proto_storage_response::proto_staged_batches::batch_result::Value;
739
740                            let value = maybe_batch.value.ok_or_else(|| {
741                                TryFromProtoError::missing_field("BatchResult::value")
742                            })?;
743                            let batch = match value {
744                                Value::Batch(batch) => Ok(batch),
745                                Value::Error(err) => Err(err),
746                            };
747                            batches.push(batch);
748                        }
749
750                        Ok::<_, TryFromProtoError>((id, batches))
751                    })
752                    .collect::<Result<_, _>>()?;
753
754                Ok(StorageResponse::StagedBatches(batches))
755            }
756            None => Err(TryFromProtoError::missing_field(
757                "ProtoStorageResponse::kind",
758            )),
759        }
760    }
761}
762
763impl Arbitrary for StorageResponse<mz_repr::Timestamp> {
764    type Strategy = Union<BoxedStrategy<Self>>;
765    type Parameters = ();
766
767    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
768        // TODO(guswynn): test `SourceStatisticsUpdates`
769        Union::new(vec![
770            (any::<GlobalId>(), any_antichain())
771                .prop_map(|(id, upper)| StorageResponse::FrontierUpper(id, upper))
772                .boxed(),
773        ])
774    }
775}
776
777/// Maintained state for partitioned storage clients.
778///
779/// This helper type unifies the responses of multiple partitioned
780/// workers in order to present as a single worker.
781#[derive(Debug)]
782pub struct PartitionedStorageState<T> {
783    /// Number of partitions the state machine represents.
784    parts: usize,
785    /// Upper frontiers for sources and sinks, both unioned across all partitions and from each
786    /// individual partition.
787    uppers: BTreeMap<GlobalId, (MutableAntichain<T>, Vec<Option<Antichain<T>>>)>,
788    /// Staged batches from oneshot sources that will get appended by `environmentd`.
789    oneshot_source_responses:
790        BTreeMap<uuid::Uuid, BTreeMap<usize, Vec<Result<ProtoBatch, String>>>>,
791}
792
793impl<T> Partitionable<StorageCommand<T>, StorageResponse<T>>
794    for (StorageCommand<T>, StorageResponse<T>)
795where
796    T: timely::progress::Timestamp + Lattice,
797{
798    type PartitionedState = PartitionedStorageState<T>;
799
800    fn new(parts: usize) -> PartitionedStorageState<T> {
801        PartitionedStorageState {
802            parts,
803            uppers: BTreeMap::new(),
804            oneshot_source_responses: BTreeMap::new(),
805        }
806    }
807}
808
809impl<T> PartitionedStorageState<T>
810where
811    T: timely::progress::Timestamp,
812{
813    fn observe_command(&mut self, command: &StorageCommand<T>) {
814        // Note that `observe_command` is quite different in `mz_compute_client`.
815        // Compute (currently) only sends the command to 1 process,
816        // but storage fans out to all workers, allowing the storage processes
817        // to self-coordinate how commands and internal commands are ordered.
818        //
819        // TODO(guswynn): cluster-unification: consolidate this with compute.
820        let _ = match command {
821            StorageCommand::CreateTimely { .. } => {
822                // Similarly, we don't reset state here like compute, because,
823                // until we are required to manage multiple replicas, we can handle
824                // keeping track of state across restarts of storage server(s).
825            }
826            StorageCommand::RunIngestion(ingestion) => {
827                self.insert_new_uppers(ingestion.description.collection_ids());
828            }
829            StorageCommand::RunSink(export) => {
830                self.insert_new_uppers([export.id]);
831            }
832            StorageCommand::InitializationComplete
833            | StorageCommand::AllowWrites
834            | StorageCommand::UpdateConfiguration(_)
835            | StorageCommand::AllowCompaction(_, _)
836            | StorageCommand::RunOneshotIngestion(_)
837            | StorageCommand::CancelOneshotIngestion { .. } => {}
838        };
839    }
840
841    /// Shared implementation for commands that install uppers with controllable behavior with
842    /// encountering existing uppers.
843    ///
844    /// If any ID was previously tracked in `self` and `skip_existing` is `false`, we return the ID
845    /// as an error.
846    fn insert_new_uppers<I: IntoIterator<Item = GlobalId>>(&mut self, ids: I) {
847        for id in ids {
848            self.uppers.entry(id).or_insert_with(|| {
849                let mut frontier = MutableAntichain::new();
850                // TODO(guswynn): cluster-unification: fix this dangerous use of `as`, by
851                // merging the types that compute and storage use.
852                #[allow(clippy::as_conversions)]
853                frontier.update_iter(iter::once((T::minimum(), self.parts as i64)));
854                let part_frontiers = vec![Some(Antichain::from_elem(T::minimum())); self.parts];
855
856                (frontier, part_frontiers)
857            });
858        }
859    }
860}
861
862impl<T> PartitionedState<StorageCommand<T>, StorageResponse<T>> for PartitionedStorageState<T>
863where
864    T: timely::progress::Timestamp + Lattice,
865{
866    fn split_command(&mut self, command: StorageCommand<T>) -> Vec<Option<StorageCommand<T>>> {
867        self.observe_command(&command);
868
869        match command {
870            StorageCommand::CreateTimely { config, epoch } => {
871                let timely_cmds = config.split_command(self.parts);
872
873                let timely_cmds = timely_cmds
874                    .into_iter()
875                    .map(|config| Some(StorageCommand::CreateTimely { config, epoch }))
876                    .collect();
877                timely_cmds
878            }
879            command => {
880                // Fan out to all processes (which will fan out to all workers).
881                // StorageState manages ordering of commands internally.
882                vec![Some(command); self.parts]
883            }
884        }
885    }
886
887    fn absorb_response(
888        &mut self,
889        shard_id: usize,
890        response: StorageResponse<T>,
891    ) -> Option<Result<StorageResponse<T>, anyhow::Error>> {
892        match response {
893            // Avoid multiple retractions of minimum time, to present as updates from one worker.
894            StorageResponse::FrontierUpper(id, new_shard_upper) => {
895                let (frontier, shard_frontiers) = match self.uppers.get_mut(&id) {
896                    Some(value) => value,
897                    None => panic!("Reference to absent collection: {id}"),
898                };
899                let old_upper = frontier.frontier().to_owned();
900                let shard_upper = match &mut shard_frontiers[shard_id] {
901                    Some(shard_upper) => shard_upper,
902                    None => panic!("Reference to absent shard {shard_id} for collection {id}"),
903                };
904                frontier.update_iter(shard_upper.iter().map(|t| (t.clone(), -1)));
905                frontier.update_iter(new_shard_upper.iter().map(|t| (t.clone(), 1)));
906                shard_upper.join_assign(&new_shard_upper);
907
908                let new_upper = frontier.frontier();
909                if PartialOrder::less_than(&old_upper.borrow(), &new_upper) {
910                    Some(Ok(StorageResponse::FrontierUpper(id, new_upper.to_owned())))
911                } else {
912                    None
913                }
914            }
915            StorageResponse::DroppedId(id) => {
916                let (_, shard_frontiers) = match self.uppers.get_mut(&id) {
917                    Some(value) => value,
918                    None => panic!("Reference to absent collection: {id}"),
919                };
920                let prev = shard_frontiers[shard_id].take();
921                assert!(
922                    prev.is_some(),
923                    "got double drop for {id} from shard {shard_id}"
924                );
925
926                if shard_frontiers.iter().all(Option::is_none) {
927                    self.uppers.remove(&id);
928                    Some(Ok(StorageResponse::DroppedId(id)))
929                } else {
930                    None
931                }
932            }
933            StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
934                // Just forward it along; the `worker_id` should have been set in `storage_state`.
935                // We _could_ consolidate across worker_id's, here, but each worker only produces
936                // responses periodically, so we avoid that complexity.
937                Some(Ok(StorageResponse::StatisticsUpdates(
938                    source_stats,
939                    sink_stats,
940                )))
941            }
942            StorageResponse::StatusUpdate(updates) => {
943                Some(Ok(StorageResponse::StatusUpdate(updates)))
944            }
945            StorageResponse::StagedBatches(batches) => {
946                let mut finished_batches = BTreeMap::new();
947
948                for (collection_id, batches) in batches {
949                    tracing::info!(%shard_id, %collection_id, "got batch");
950
951                    let entry = self
952                        .oneshot_source_responses
953                        .entry(collection_id)
954                        .or_default();
955                    let novel = entry.insert(shard_id, batches);
956                    assert_none!(novel, "Duplicate oneshot source response");
957
958                    // Check if we've received responses from all shards.
959                    if entry.len() == self.parts {
960                        let entry = self
961                            .oneshot_source_responses
962                            .remove(&collection_id)
963                            .expect("checked above");
964                        let all_batches: Vec<_> = entry.into_values().flatten().collect();
965
966                        finished_batches.insert(collection_id, all_batches);
967                    }
968                }
969
970                if !finished_batches.is_empty() {
971                    Some(Ok(StorageResponse::StagedBatches(finished_batches)))
972                } else {
973                    None
974                }
975            }
976        }
977    }
978}
979
980#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
981/// A batch of updates to be fed to a local input
982pub struct Update<T = mz_repr::Timestamp> {
983    pub row: Row,
984    pub timestamp: T,
985    pub diff: Diff,
986}
987
988#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
989/// A batch of updates to be fed to a local input; however, the input must
990/// determine the most appropriate timestamps to use.
991///
992/// TODO(cf2): Can we remove this and use only on [`TableData`].
993pub struct TimestamplessUpdate {
994    pub row: Row,
995    pub diff: Diff,
996}
997
998#[derive(Debug, Clone, PartialEq)]
999pub enum TableData {
1000    /// Rows that still need to be persisted and appended.
1001    ///
1002    /// The contained [`Row`]s are _not_ consolidated.
1003    Rows(Vec<(Row, Diff)>),
1004    /// Batches already staged in Persist ready to be appended.
1005    Batches(SmallVec<[ProtoBatch; 1]>),
1006}
1007
1008impl TableData {
1009    pub fn is_empty(&self) -> bool {
1010        match self {
1011            TableData::Rows(rows) => rows.is_empty(),
1012            TableData::Batches(batches) => batches.is_empty(),
1013        }
1014    }
1015}
1016
1017/// A collection of timestamp-less updates. As updates are added to the builder
1018/// they are automatically spilled to blob storage.
1019pub struct TimestamplessUpdateBuilder<K, V, T, D>
1020where
1021    K: Codec,
1022    V: Codec,
1023    T: Timestamp + Lattice + Codec64,
1024    D: Codec64,
1025{
1026    builder: BatchBuilder<K, V, T, D>,
1027    initial_ts: T,
1028}
1029
1030impl<K, V, T, D> TimestamplessUpdateBuilder<K, V, T, D>
1031where
1032    K: Debug + Codec,
1033    V: Debug + Codec,
1034    T: TimestampManipulation + Lattice + Codec64 + Sync,
1035    D: Semigroup + Ord + Codec64 + Send + Sync,
1036{
1037    /// Create a new [`TimestamplessUpdateBuilder`] for the shard associated
1038    /// with the provided [`WriteHandle`].
1039    pub fn new(handle: &WriteHandle<K, V, T, D>) -> Self {
1040        let initial_ts = T::minimum();
1041        let builder = handle.builder(Antichain::from_elem(initial_ts.clone()));
1042        TimestamplessUpdateBuilder {
1043            builder,
1044            initial_ts,
1045        }
1046    }
1047
1048    /// Add a `(K, V, D)` to the staged batch.
1049    pub async fn add(&mut self, k: &K, v: &V, d: &D) {
1050        self.builder
1051            .add(k, v, &self.initial_ts, d)
1052            .await
1053            .expect("invalid Persist usage");
1054    }
1055
1056    /// Finish the builder and return a [`ProtoBatch`] which can later be linked into a shard.
1057    ///
1058    /// The returned batch has nonsensical lower and upper bounds and must be re-written before
1059    /// appending into the destination shard.
1060    pub async fn finish(self) -> ProtoBatch {
1061        let finish_ts = StepForward::step_forward(&self.initial_ts);
1062        let batch = self
1063            .builder
1064            .finish(Antichain::from_elem(finish_ts))
1065            .await
1066            .expect("invalid Persist usage");
1067
1068        batch.into_transmittable_batch()
1069    }
1070}
1071
1072impl RustType<ProtoCompaction> for (GlobalId, Antichain<mz_repr::Timestamp>) {
1073    fn into_proto(&self) -> ProtoCompaction {
1074        ProtoCompaction {
1075            id: Some(self.0.into_proto()),
1076            frontier: Some(self.1.into_proto()),
1077        }
1078    }
1079
1080    fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1081        Ok((
1082            proto.id.into_rust_if_some("ProtoCompaction::id")?,
1083            proto
1084                .frontier
1085                .into_rust_if_some("ProtoCompaction::frontier")?,
1086        ))
1087    }
1088}
1089
1090impl TryIntoTimelyConfig for StorageCommand {
1091    fn try_into_timely_config(self) -> Result<(TimelyConfig, ClusterStartupEpoch), Self> {
1092        match self {
1093            StorageCommand::CreateTimely { config, epoch } => Ok((config, epoch)),
1094            cmd => Err(cmd),
1095        }
1096    }
1097}
1098
1099#[cfg(test)]
1100mod tests {
1101    use mz_ore::assert_ok;
1102    use mz_proto::protobuf_roundtrip;
1103    use proptest::prelude::ProptestConfig;
1104    use proptest::proptest;
1105
1106    use super::*;
1107
1108    proptest! {
1109        #![proptest_config(ProptestConfig::with_cases(32))]
1110
1111        #[mz_ore::test]
1112        #[cfg_attr(miri, ignore)] // too slow
1113        fn storage_command_protobuf_roundtrip(expect in any::<StorageCommand<mz_repr::Timestamp>>() ) {
1114            let actual = protobuf_roundtrip::<_, ProtoStorageCommand>(&expect);
1115            assert_ok!(actual);
1116            assert_eq!(actual.unwrap(), expect);
1117        }
1118
1119        #[mz_ore::test]
1120        #[cfg_attr(miri, ignore)] // too slow
1121        fn storage_response_protobuf_roundtrip(expect in any::<StorageResponse<mz_repr::Timestamp>>() ) {
1122            let actual = protobuf_roundtrip::<_, ProtoStorageResponse>(&expect);
1123            assert_ok!(actual);
1124            assert_eq!(actual.unwrap(), expect);
1125        }
1126    }
1127}