mz_storage_client/
client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![allow(missing_docs)]
11// Tonic generates code that violates clippy lints.
12// TODO: Remove this once tonic does not produce this code anymore.
13#![allow(clippy::as_conversions, clippy::clone_on_ref_ptr)]
14
15//! The public API of the storage layer.
16
17use std::collections::{BTreeMap, BTreeSet};
18use std::fmt::Debug;
19use std::iter;
20
21use async_trait::async_trait;
22use differential_dataflow::difference::Semigroup;
23use differential_dataflow::lattice::Lattice;
24use mz_cluster_client::ReplicaId;
25use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig, TryIntoTimelyConfig};
26use mz_ore::assert_none;
27use mz_persist_client::batch::{BatchBuilder, ProtoBatch};
28use mz_persist_client::write::WriteHandle;
29use mz_persist_types::{Codec, Codec64, StepForward};
30use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
31use mz_repr::{Diff, GlobalId, Row, TimestampManipulation};
32use mz_service::client::{GenericClient, Partitionable, PartitionedState};
33use mz_service::grpc::{GrpcClient, GrpcServer, ProtoServiceTypes, ResponseStream};
34use mz_storage_types::controller::CollectionMetadata;
35use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
36use mz_storage_types::parameters::StorageParameters;
37use mz_storage_types::sinks::StorageSinkDesc;
38use mz_storage_types::sources::IngestionDescription;
39use mz_timely_util::progress::any_antichain;
40use proptest::prelude::{Arbitrary, any};
41use proptest::strategy::{BoxedStrategy, Strategy, Union};
42use serde::{Deserialize, Serialize};
43use smallvec::SmallVec;
44use timely::PartialOrder;
45use timely::progress::Timestamp;
46use timely::progress::frontier::{Antichain, MutableAntichain};
47use tonic::{Request, Status as TonicStatus, Streaming};
48use uuid::Uuid;
49
50use crate::client::proto_storage_server::ProtoStorage;
51use crate::metrics::ReplicaMetrics;
52use crate::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate};
53
54include!(concat!(env!("OUT_DIR"), "/mz_storage_client.client.rs"));
55
56/// A client to a storage server.
57pub trait StorageClient<T = mz_repr::Timestamp>:
58    GenericClient<StorageCommand<T>, StorageResponse<T>>
59{
60}
61
62impl<C, T> StorageClient<T> for C where C: GenericClient<StorageCommand<T>, StorageResponse<T>> {}
63
64#[async_trait]
65impl<T: Send> GenericClient<StorageCommand<T>, StorageResponse<T>> for Box<dyn StorageClient<T>> {
66    async fn send(&mut self, cmd: StorageCommand<T>) -> Result<(), anyhow::Error> {
67        (**self).send(cmd).await
68    }
69
70    /// # Cancel safety
71    ///
72    /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
73    /// statement and some other branch completes first, it is guaranteed that no messages were
74    /// received by this client.
75    async fn recv(&mut self) -> Result<Option<StorageResponse<T>>, anyhow::Error> {
76        // `GenericClient::recv` is required to be cancel safe.
77        (**self).recv().await
78    }
79}
80
81#[derive(Debug, Clone)]
82pub enum StorageProtoServiceTypes {}
83
84impl ProtoServiceTypes for StorageProtoServiceTypes {
85    type PC = ProtoStorageCommand;
86    type PR = ProtoStorageResponse;
87    type STATS = ReplicaMetrics;
88    const URL: &'static str = "/mz_storage_client.client.ProtoStorage/CommandResponseStream";
89}
90
91pub type StorageGrpcClient = GrpcClient<StorageProtoServiceTypes>;
92
93#[async_trait]
94impl<F, G> ProtoStorage for GrpcServer<F>
95where
96    F: Fn() -> G + Send + Sync + 'static,
97    G: StorageClient + 'static,
98{
99    type CommandResponseStreamStream = ResponseStream<ProtoStorageResponse>;
100
101    async fn command_response_stream(
102        &self,
103        request: Request<Streaming<ProtoStorageCommand>>,
104    ) -> Result<tonic::Response<Self::CommandResponseStreamStream>, TonicStatus> {
105        self.forward_bidi_stream(request).await
106    }
107}
108
109/// Commands related to the ingress and egress of collections.
110#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
111pub enum StorageCommand<T = mz_repr::Timestamp> {
112    /// Specifies to the storage server(s) the shape of the timely cluster
113    /// we want created, before other commands are sent.
114    CreateTimely {
115        config: TimelyConfig,
116        epoch: ClusterStartupEpoch,
117    },
118    /// Indicates that the controller has sent all commands reflecting its
119    /// initial state.
120    InitializationComplete,
121    /// `AllowWrites` informs the replica that it can transition out of the
122    /// read-only stage and into the read-write computation stage.
123    /// It is now allowed to affect changes to external systems (writes).
124    ///
125    /// See `ComputeCommand::AllowWrites` for details. This command works
126    /// analogously to the compute version.
127    AllowWrites,
128    /// Update storage instance configuration.
129    UpdateConfiguration(StorageParameters),
130    /// Run the enumerated sources, each associated with its identifier.
131    RunIngestions(Vec<RunIngestionCommand>),
132    /// Enable compaction in storage-managed collections.
133    ///
134    /// Each entry in the vector names a collection and provides a frontier after which
135    /// accumulations must be correct.
136    AllowCompaction(Vec<(GlobalId, Antichain<T>)>),
137    RunSinks(Vec<RunSinkCommand<T>>),
138    /// Run a dataflow which will ingest data from an external source and only __stage__ it in
139    /// Persist.
140    ///
141    /// Unlike regular ingestions/sources, some other component (e.g. `environmentd`) is
142    /// responsible for linking the staged data into a shard.
143    RunOneshotIngestion(Vec<RunOneshotIngestion>),
144    /// `CancelOneshotIngestion` instructs the replica to cancel the identified oneshot ingestions.
145    ///
146    /// It is invalid to send a [`CancelOneshotIngestion`] command that references a oneshot
147    /// ingestion that was not created by a corresponding [`RunOneshotIngestion`] command before.
148    /// Doing so may cause the replica to exhibit undefined behavior.
149    ///
150    /// [`CancelOneshotIngestion`]: crate::client::StorageCommand::CancelOneshotIngestion
151    /// [`RunOneshotIngestion`]: crate::client::StorageCommand::RunOneshotIngestion
152    CancelOneshotIngestion {
153        ingestions: Vec<Uuid>,
154    },
155}
156
157impl<T> StorageCommand<T> {
158    /// Returns whether this command instructs the installation of storage objects.
159    pub fn installs_objects(&self) -> bool {
160        use StorageCommand::*;
161        match self {
162            CreateTimely { .. }
163            | InitializationComplete
164            | AllowWrites
165            | UpdateConfiguration(_)
166            | AllowCompaction(_)
167            | CancelOneshotIngestion { .. } => false,
168            // TODO(cf2): multi-replica oneshot ingestions. At the moment returning
169            // true here means we can't run `COPY FROM` on multi-replica clusters, this
170            // should be easy enough to support though.
171            RunIngestions(_) | RunSinks(_) | RunOneshotIngestion(_) => true,
172        }
173    }
174}
175
176/// A command that starts ingesting the given ingestion description
177#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
178pub struct RunIngestionCommand {
179    /// The id of the storage collection being ingested.
180    pub id: GlobalId,
181    /// The description of what source type should be ingested and what post-processing steps must
182    /// be applied to the data before writing them down into the storage collection
183    pub description: IngestionDescription<CollectionMetadata>,
184}
185
186impl Arbitrary for RunIngestionCommand {
187    type Strategy = BoxedStrategy<Self>;
188    type Parameters = ();
189
190    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
191        (
192            any::<GlobalId>(),
193            any::<IngestionDescription<CollectionMetadata>>(),
194        )
195            .prop_map(|(id, description)| Self { id, description })
196            .boxed()
197    }
198}
199
200impl RustType<ProtoRunIngestionCommand> for RunIngestionCommand {
201    fn into_proto(&self) -> ProtoRunIngestionCommand {
202        ProtoRunIngestionCommand {
203            id: Some(self.id.into_proto()),
204            description: Some(self.description.into_proto()),
205        }
206    }
207
208    fn from_proto(proto: ProtoRunIngestionCommand) -> Result<Self, TryFromProtoError> {
209        Ok(RunIngestionCommand {
210            id: proto.id.into_rust_if_some("ProtoRunIngestionCommand::id")?,
211            description: proto
212                .description
213                .into_rust_if_some("ProtoRunIngestionCommand::description")?,
214        })
215    }
216}
217
218/// A command that starts ingesting the given ingestion description
219#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
220pub struct RunOneshotIngestion {
221    /// The ID of the ingestion dataflow.
222    pub ingestion_id: uuid::Uuid,
223    /// The ID of collection we'll stage batches for.
224    pub collection_id: GlobalId,
225    /// Metadata for the collection we'll stage batches for.
226    pub collection_meta: CollectionMetadata,
227    /// Details for the oneshot ingestion.
228    pub request: OneshotIngestionRequest,
229}
230
231impl RustType<ProtoRunOneshotIngestion> for RunOneshotIngestion {
232    fn into_proto(&self) -> ProtoRunOneshotIngestion {
233        ProtoRunOneshotIngestion {
234            ingestion_id: Some(self.ingestion_id.into_proto()),
235            collection_id: Some(self.collection_id.into_proto()),
236            storage_metadata: Some(self.collection_meta.into_proto()),
237            request: Some(self.request.into_proto()),
238        }
239    }
240
241    fn from_proto(proto: ProtoRunOneshotIngestion) -> Result<Self, TryFromProtoError> {
242        Ok(RunOneshotIngestion {
243            ingestion_id: proto
244                .ingestion_id
245                .into_rust_if_some("ProtoRunOneshotIngestion::ingestion_id")?,
246            collection_id: proto
247                .collection_id
248                .into_rust_if_some("ProtoRunOneshotIngestion::collection_id")?,
249            collection_meta: proto
250                .storage_metadata
251                .into_rust_if_some("ProtoRunOneshotIngestion::storage_metadata")?,
252            request: proto
253                .request
254                .into_rust_if_some("ProtoRunOneshotIngestion::request")?,
255        })
256    }
257}
258
259impl RustType<ProtoRunSinkCommand> for RunSinkCommand<mz_repr::Timestamp> {
260    fn into_proto(&self) -> ProtoRunSinkCommand {
261        ProtoRunSinkCommand {
262            id: Some(self.id.into_proto()),
263            description: Some(self.description.into_proto()),
264        }
265    }
266
267    fn from_proto(proto: ProtoRunSinkCommand) -> Result<Self, TryFromProtoError> {
268        Ok(RunSinkCommand {
269            id: proto.id.into_rust_if_some("ProtoRunSinkCommand::id")?,
270            description: proto
271                .description
272                .into_rust_if_some("ProtoRunSinkCommand::description")?,
273        })
274    }
275}
276
277/// A command that starts exporting the given sink description
278#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
279pub struct RunSinkCommand<T> {
280    pub id: GlobalId,
281    pub description: StorageSinkDesc<CollectionMetadata, T>,
282}
283
284impl Arbitrary for RunSinkCommand<mz_repr::Timestamp> {
285    type Strategy = BoxedStrategy<Self>;
286    type Parameters = ();
287
288    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
289        (
290            any::<GlobalId>(),
291            any::<StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>>(),
292        )
293            .prop_map(|(id, description)| Self { id, description })
294            .boxed()
295    }
296}
297
298impl RustType<ProtoStorageCommand> for StorageCommand<mz_repr::Timestamp> {
299    fn into_proto(&self) -> ProtoStorageCommand {
300        use proto_storage_command::Kind::*;
301        use proto_storage_command::*;
302        ProtoStorageCommand {
303            kind: Some(match self {
304                StorageCommand::CreateTimely { config, epoch } => CreateTimely(ProtoCreateTimely {
305                    config: Some(config.into_proto()),
306                    epoch: Some(epoch.into_proto()),
307                }),
308                StorageCommand::InitializationComplete => InitializationComplete(()),
309                StorageCommand::AllowWrites => AllowWrites(()),
310                StorageCommand::UpdateConfiguration(params) => {
311                    UpdateConfiguration(params.into_proto())
312                }
313                StorageCommand::AllowCompaction(collections) => {
314                    AllowCompaction(ProtoAllowCompaction {
315                        collections: collections.into_proto(),
316                    })
317                }
318                StorageCommand::RunIngestions(sources) => CreateSources(ProtoCreateSources {
319                    sources: sources.into_proto(),
320                }),
321                StorageCommand::RunSinks(sinks) => RunSinks(ProtoRunSinks {
322                    sinks: sinks.into_proto(),
323                }),
324                StorageCommand::RunOneshotIngestion(ingestions) => {
325                    RunOneshotIngestions(ProtoRunOneshotIngestionsCommand {
326                        ingestions: ingestions.iter().map(|cmd| cmd.into_proto()).collect(),
327                    })
328                }
329                StorageCommand::CancelOneshotIngestion { ingestions } => {
330                    CancelOneshotIngestions(ProtoCancelOneshotIngestionsCommand {
331                        ingestions: ingestions.iter().map(|uuid| uuid.into_proto()).collect(),
332                    })
333                }
334            }),
335        }
336    }
337
338    fn from_proto(proto: ProtoStorageCommand) -> Result<Self, TryFromProtoError> {
339        use proto_storage_command::Kind::*;
340        use proto_storage_command::*;
341        match proto.kind {
342            Some(CreateTimely(ProtoCreateTimely { config, epoch })) => {
343                Ok(StorageCommand::CreateTimely {
344                    config: config.into_rust_if_some("ProtoCreateTimely::config")?,
345                    epoch: epoch.into_rust_if_some("ProtoCreateTimely::epoch")?,
346                })
347            }
348            Some(InitializationComplete(())) => Ok(StorageCommand::InitializationComplete),
349            Some(AllowWrites(())) => Ok(StorageCommand::AllowWrites),
350            Some(UpdateConfiguration(params)) => {
351                Ok(StorageCommand::UpdateConfiguration(params.into_rust()?))
352            }
353            Some(CreateSources(ProtoCreateSources { sources })) => {
354                Ok(StorageCommand::RunIngestions(sources.into_rust()?))
355            }
356            Some(AllowCompaction(ProtoAllowCompaction { collections })) => {
357                Ok(StorageCommand::AllowCompaction(collections.into_rust()?))
358            }
359            Some(RunSinks(ProtoRunSinks { sinks })) => {
360                Ok(StorageCommand::RunSinks(sinks.into_rust()?))
361            }
362            Some(RunOneshotIngestions(oneshot)) => {
363                let ingestions = oneshot
364                    .ingestions
365                    .into_iter()
366                    .map(|cmd| cmd.into_rust())
367                    .collect::<Result<_, _>>()?;
368                Ok(StorageCommand::RunOneshotIngestion(ingestions))
369            }
370            Some(CancelOneshotIngestions(oneshot)) => {
371                let ingestions = oneshot
372                    .ingestions
373                    .into_iter()
374                    .map(|uuid| uuid.into_rust())
375                    .collect::<Result<_, _>>()?;
376                Ok(StorageCommand::CancelOneshotIngestion { ingestions })
377            }
378            None => Err(TryFromProtoError::missing_field(
379                "ProtoStorageCommand::kind",
380            )),
381        }
382    }
383}
384
385impl Arbitrary for StorageCommand<mz_repr::Timestamp> {
386    type Strategy = Union<BoxedStrategy<Self>>;
387    type Parameters = ();
388
389    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
390        Union::new(vec![
391            // TODO(guswynn): cluster-unification: also test `CreateTimely` here.
392            proptest::collection::vec(any::<RunIngestionCommand>(), 1..4)
393                .prop_map(StorageCommand::RunIngestions)
394                .boxed(),
395            proptest::collection::vec(any::<RunSinkCommand<mz_repr::Timestamp>>(), 1..4)
396                .prop_map(StorageCommand::RunSinks)
397                .boxed(),
398            proptest::collection::vec(
399                (
400                    any::<GlobalId>(),
401                    proptest::collection::vec(any::<mz_repr::Timestamp>(), 1..4),
402                ),
403                1..4,
404            )
405            .prop_map(|collections| {
406                StorageCommand::AllowCompaction(
407                    collections
408                        .into_iter()
409                        .map(|(id, frontier_vec)| (id, Antichain::from(frontier_vec)))
410                        .collect(),
411                )
412            })
413            .boxed(),
414        ])
415    }
416}
417
418/// A "kind" enum for statuses tracked by the health operator
419#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
420pub enum Status {
421    Starting,
422    Running,
423    Paused,
424    Stalled,
425    /// This status is currently unused.
426    // re-design the ceased status
427    Ceased,
428    Dropped,
429}
430
431impl std::str::FromStr for Status {
432    type Err = anyhow::Error;
433    /// Keep in sync with [`Status::to_str`].
434    fn from_str(s: &str) -> Result<Self, Self::Err> {
435        Ok(match s {
436            "starting" => Status::Starting,
437            "running" => Status::Running,
438            "paused" => Status::Paused,
439            "stalled" => Status::Stalled,
440            "ceased" => Status::Ceased,
441            "dropped" => Status::Dropped,
442            s => return Err(anyhow::anyhow!("{} is not a valid status", s)),
443        })
444    }
445}
446
447impl Status {
448    /// Keep in sync with `Status::from_str`.
449    pub fn to_str(&self) -> &'static str {
450        match self {
451            Status::Starting => "starting",
452            Status::Running => "running",
453            Status::Paused => "paused",
454            Status::Stalled => "stalled",
455            Status::Ceased => "ceased",
456            Status::Dropped => "dropped",
457        }
458    }
459
460    /// Determines if a new status should be produced in context of a previous
461    /// status.
462    pub fn superseded_by(self, new: Status) -> bool {
463        match (self, new) {
464            (_, Status::Dropped) => true,
465            (Status::Dropped, _) => false,
466            // Don't re-mark that object as paused.
467            (Status::Paused, Status::Paused) => false,
468            // De-duplication of other statuses is currently managed by the
469            // `health_operator`.
470            _ => true,
471        }
472    }
473}
474
475/// A source or sink status update.
476///
477/// Represents a status update for a given object type. The inner value for each
478/// variant should be able to be packed into a status row that conforms to the schema
479/// for the object's status history relation.
480#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
481pub struct StatusUpdate {
482    pub id: GlobalId,
483    pub status: Status,
484    pub timestamp: chrono::DateTime<chrono::Utc>,
485    pub error: Option<String>,
486    pub hints: BTreeSet<String>,
487    pub namespaced_errors: BTreeMap<String, String>,
488    pub replica_id: Option<ReplicaId>,
489}
490
491impl StatusUpdate {
492    pub fn new(
493        id: GlobalId,
494        timestamp: chrono::DateTime<chrono::Utc>,
495        status: Status,
496    ) -> StatusUpdate {
497        StatusUpdate {
498            id,
499            timestamp,
500            status,
501            error: None,
502            hints: Default::default(),
503            namespaced_errors: Default::default(),
504            replica_id: None,
505        }
506    }
507}
508
509impl From<StatusUpdate> for Row {
510    fn from(update: StatusUpdate) -> Self {
511        use mz_repr::Datum;
512
513        let timestamp = Datum::TimestampTz(update.timestamp.try_into().expect("must fit"));
514        let id = update.id.to_string();
515        let id = Datum::String(&id);
516        let status = Datum::String(update.status.to_str());
517        let error = update.error.as_deref().into();
518
519        let mut row = Row::default();
520        let mut packer = row.packer();
521        packer.extend([timestamp, id, status, error]);
522
523        if !update.hints.is_empty() || !update.namespaced_errors.is_empty() {
524            packer.push_dict_with(|dict_packer| {
525                // `hint` and `namespaced` are ordered,
526                // as well as the BTree's they each contain.
527                if !update.hints.is_empty() {
528                    dict_packer.push(Datum::String("hints"));
529                    dict_packer.push_list(update.hints.iter().map(|s| Datum::String(s)));
530                }
531                if !update.namespaced_errors.is_empty() {
532                    dict_packer.push(Datum::String("namespaced"));
533                    dict_packer.push_dict(
534                        update
535                            .namespaced_errors
536                            .iter()
537                            .map(|(k, v)| (k.as_str(), Datum::String(v))),
538                    );
539                }
540            });
541        } else {
542            packer.push(Datum::Null);
543        }
544
545        match update.replica_id {
546            Some(id) => packer.push(Datum::String(&id.to_string())),
547            None => packer.push(Datum::Null),
548        }
549
550        row
551    }
552}
553
554impl RustType<proto_storage_response::ProtoStatus> for Status {
555    fn into_proto(&self) -> proto_storage_response::ProtoStatus {
556        use proto_storage_response::proto_status::*;
557
558        proto_storage_response::ProtoStatus {
559            kind: Some(match self {
560                Status::Starting => Kind::Starting(()),
561                Status::Running => Kind::Running(()),
562                Status::Paused => Kind::Paused(()),
563                Status::Stalled => Kind::Stalled(()),
564                Status::Ceased => Kind::Ceased(()),
565                Status::Dropped => Kind::Dropped(()),
566            }),
567        }
568    }
569
570    fn from_proto(proto: proto_storage_response::ProtoStatus) -> Result<Self, TryFromProtoError> {
571        use proto_storage_response::proto_status::*;
572        let kind = proto
573            .kind
574            .ok_or_else(|| TryFromProtoError::missing_field("ProtoStatus::kind"))?;
575
576        Ok(match kind {
577            Kind::Starting(()) => Status::Starting,
578            Kind::Running(()) => Status::Running,
579            Kind::Paused(()) => Status::Paused,
580            Kind::Stalled(()) => Status::Stalled,
581            Kind::Ceased(()) => Status::Ceased,
582            Kind::Dropped(()) => Status::Dropped,
583        })
584    }
585}
586
587impl RustType<proto_storage_response::ProtoStatusUpdate> for StatusUpdate {
588    fn into_proto(&self) -> proto_storage_response::ProtoStatusUpdate {
589        proto_storage_response::ProtoStatusUpdate {
590            id: Some(self.id.into_proto()),
591            status: Some(self.status.into_proto()),
592            timestamp: Some(self.timestamp.into_proto()),
593            error: self.error.clone(),
594            hints: self.hints.iter().cloned().collect(),
595            namespaced_errors: self.namespaced_errors.clone(),
596            replica_id: self.replica_id.map(|id| id.to_string().into_proto()),
597        }
598    }
599
600    fn from_proto(
601        proto: proto_storage_response::ProtoStatusUpdate,
602    ) -> Result<Self, TryFromProtoError> {
603        Ok(StatusUpdate {
604            id: proto.id.into_rust_if_some("ProtoStatusUpdate::id")?,
605            timestamp: proto
606                .timestamp
607                .into_rust_if_some("ProtoStatusUpdate::timestamp")?,
608            status: proto
609                .status
610                .into_rust_if_some("ProtoStatusUpdate::status")?,
611            error: proto.error,
612            hints: proto.hints.into_iter().collect(),
613            namespaced_errors: proto.namespaced_errors,
614            replica_id: proto
615                .replica_id
616                .map(|replica_id: String| replica_id.parse().expect("must be a valid replica id")),
617        })
618    }
619}
620
621/// An update to an append only collection.
622pub enum AppendOnlyUpdate {
623    Row((Row, Diff)),
624    Status(StatusUpdate),
625}
626
627impl AppendOnlyUpdate {
628    pub fn into_row(self) -> (Row, Diff) {
629        match self {
630            AppendOnlyUpdate::Row((row, diff)) => (row, diff),
631            AppendOnlyUpdate::Status(status) => (Row::from(status), Diff::ONE),
632        }
633    }
634}
635
636impl From<(Row, Diff)> for AppendOnlyUpdate {
637    fn from((row, diff): (Row, Diff)) -> Self {
638        Self::Row((row, diff))
639    }
640}
641
642impl From<StatusUpdate> for AppendOnlyUpdate {
643    fn from(update: StatusUpdate) -> Self {
644        Self::Status(update)
645    }
646}
647
648/// Responses that the storage nature of a worker/dataflow can provide back to the coordinator.
649#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
650pub enum StorageResponse<T = mz_repr::Timestamp> {
651    /// A list of identifiers of traces, with new upper frontiers.
652    FrontierUppers(Vec<(GlobalId, Antichain<T>)>),
653    /// Punctuation indicates that no more responses will be transmitted for the specified id
654    DroppedId(GlobalId),
655    /// Batches that have been staged in Persist and maybe will be linked into a shard.
656    StagedBatches(BTreeMap<uuid::Uuid, Vec<Result<ProtoBatch, String>>>),
657
658    /// A list of statistics updates, currently only for sources.
659    StatisticsUpdates(Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>),
660    /// A list of status updates for sources and sinks. Periodically sent from
661    /// storage workers to convey the latest status information about an object.
662    StatusUpdates(Vec<StatusUpdate>),
663}
664
665impl RustType<ProtoStorageResponse> for StorageResponse<mz_repr::Timestamp> {
666    fn into_proto(&self) -> ProtoStorageResponse {
667        use proto_storage_response::Kind::*;
668        use proto_storage_response::{
669            ProtoDroppedId, ProtoStagedBatches, ProtoStatisticsUpdates, ProtoStatusUpdates,
670        };
671        ProtoStorageResponse {
672            kind: Some(match self {
673                StorageResponse::FrontierUppers(traces) => FrontierUppers(traces.into_proto()),
674                StorageResponse::DroppedId(id) => DroppedId(ProtoDroppedId {
675                    id: Some(id.into_proto()),
676                }),
677                StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
678                    Stats(ProtoStatisticsUpdates {
679                        source_updates: source_stats
680                            .iter()
681                            .map(|update| update.into_proto())
682                            .collect(),
683                        sink_updates: sink_stats
684                            .iter()
685                            .map(|update| update.into_proto())
686                            .collect(),
687                    })
688                }
689                StorageResponse::StatusUpdates(updates) => StatusUpdates(ProtoStatusUpdates {
690                    updates: updates.into_proto(),
691                }),
692                StorageResponse::StagedBatches(staged) => {
693                    let batches = staged
694                        .into_iter()
695                        .map(|(collection_id, batches)| {
696                            let batches = batches
697                                .into_iter()
698                                .map(|result| {
699                                    use proto_storage_response::proto_staged_batches::batch_result::Value;
700                                    let value = match result {
701                                        Ok(batch) => Value::Batch(batch.clone()),
702                                        Err(err) => Value::Error(err.clone()),
703                                    };
704                                    proto_storage_response::proto_staged_batches::BatchResult { value: Some(value) }
705                                })
706                                .collect();
707                            proto_storage_response::proto_staged_batches::Inner {
708                                id: Some(collection_id.into_proto()),
709                                batches,
710                            }
711                        })
712                        .collect();
713                    StagedBatches(ProtoStagedBatches { batches })
714                }
715            }),
716        }
717    }
718
719    fn from_proto(proto: ProtoStorageResponse) -> Result<Self, TryFromProtoError> {
720        use proto_storage_response::Kind::*;
721        use proto_storage_response::{ProtoDroppedId, ProtoStatusUpdates};
722        match proto.kind {
723            Some(DroppedId(ProtoDroppedId { id })) => Ok(StorageResponse::DroppedId(
724                id.into_rust_if_some("ProtoDroppedId::id")?,
725            )),
726            Some(FrontierUppers(traces)) => {
727                Ok(StorageResponse::FrontierUppers(traces.into_rust()?))
728            }
729            Some(Stats(stats)) => Ok(StorageResponse::StatisticsUpdates(
730                stats
731                    .source_updates
732                    .into_iter()
733                    .map(|update| update.into_rust())
734                    .collect::<Result<Vec<_>, TryFromProtoError>>()?,
735                stats
736                    .sink_updates
737                    .into_iter()
738                    .map(|update| update.into_rust())
739                    .collect::<Result<Vec<_>, TryFromProtoError>>()?,
740            )),
741            Some(StatusUpdates(ProtoStatusUpdates { updates })) => {
742                Ok(StorageResponse::StatusUpdates(updates.into_rust()?))
743            }
744            Some(StagedBatches(staged)) => {
745                let batches: BTreeMap<_, _> = staged
746                    .batches
747                    .into_iter()
748                    .map(|inner| {
749                        let id = inner
750                            .id
751                            .into_rust_if_some("ProtoStagedBatches::Inner::id")?;
752
753                        let mut batches = Vec::with_capacity(inner.batches.len());
754                        for maybe_batch in inner.batches {
755                            use proto_storage_response::proto_staged_batches::batch_result::Value;
756
757                            let value = maybe_batch.value.ok_or_else(|| {
758                                TryFromProtoError::missing_field("BatchResult::value")
759                            })?;
760                            let batch = match value {
761                                Value::Batch(batch) => Ok(batch),
762                                Value::Error(err) => Err(err),
763                            };
764                            batches.push(batch);
765                        }
766
767                        Ok::<_, TryFromProtoError>((id, batches))
768                    })
769                    .collect::<Result<_, _>>()?;
770
771                Ok(StorageResponse::StagedBatches(batches))
772            }
773            None => Err(TryFromProtoError::missing_field(
774                "ProtoStorageResponse::kind",
775            )),
776        }
777    }
778}
779
780impl Arbitrary for StorageResponse<mz_repr::Timestamp> {
781    type Strategy = Union<BoxedStrategy<Self>>;
782    type Parameters = ();
783
784    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
785        // TODO(guswynn): test `SourceStatisticsUpdates`
786        Union::new(vec![
787            proptest::collection::vec((any::<GlobalId>(), any_antichain()), 1..4)
788                .prop_map(StorageResponse::FrontierUppers)
789                .boxed(),
790        ])
791    }
792}
793
794/// Maintained state for partitioned storage clients.
795///
796/// This helper type unifies the responses of multiple partitioned
797/// workers in order to present as a single worker.
798#[derive(Debug)]
799pub struct PartitionedStorageState<T> {
800    /// Number of partitions the state machine represents.
801    parts: usize,
802    /// Upper frontiers for sources and sinks, both unioned across all partitions and from each
803    /// individual partition.
804    uppers: BTreeMap<GlobalId, (MutableAntichain<T>, Vec<Option<Antichain<T>>>)>,
805    /// Staged batches from oneshot sources that will get appended by `environmentd`.
806    oneshot_source_responses:
807        BTreeMap<uuid::Uuid, BTreeMap<usize, Vec<Result<ProtoBatch, String>>>>,
808}
809
810impl<T> Partitionable<StorageCommand<T>, StorageResponse<T>>
811    for (StorageCommand<T>, StorageResponse<T>)
812where
813    T: timely::progress::Timestamp + Lattice,
814{
815    type PartitionedState = PartitionedStorageState<T>;
816
817    fn new(parts: usize) -> PartitionedStorageState<T> {
818        PartitionedStorageState {
819            parts,
820            uppers: BTreeMap::new(),
821            oneshot_source_responses: BTreeMap::new(),
822        }
823    }
824}
825
826impl<T> PartitionedStorageState<T>
827where
828    T: timely::progress::Timestamp,
829{
830    fn observe_command(&mut self, command: &StorageCommand<T>) {
831        // Note that `observe_command` is quite different in `mz_compute_client`.
832        // Compute (currently) only sends the command to 1 process,
833        // but storage fans out to all workers, allowing the storage processes
834        // to self-coordinate how commands and internal commands are ordered.
835        //
836        // TODO(guswynn): cluster-unification: consolidate this with compute.
837        let _ = match command {
838            StorageCommand::CreateTimely { .. } => {
839                // Similarly, we don't reset state here like compute, because,
840                // until we are required to manage multiple replicas, we can handle
841                // keeping track of state across restarts of storage server(s).
842            }
843            StorageCommand::RunIngestions(ingestions) => ingestions
844                .iter()
845                .for_each(|i| self.insert_new_uppers(i.description.collection_ids())),
846            StorageCommand::RunSinks(exports) => {
847                exports.iter().for_each(|e| self.insert_new_uppers([e.id]))
848            }
849            StorageCommand::InitializationComplete
850            | StorageCommand::AllowWrites
851            | StorageCommand::UpdateConfiguration(_)
852            | StorageCommand::AllowCompaction(_)
853            | StorageCommand::RunOneshotIngestion(_)
854            | StorageCommand::CancelOneshotIngestion { .. } => {}
855        };
856    }
857
858    /// Shared implementation for commands that install uppers with controllable behavior with
859    /// encountering existing uppers.
860    ///
861    /// If any ID was previously tracked in `self` and `skip_existing` is `false`, we return the ID
862    /// as an error.
863    fn insert_new_uppers<I: IntoIterator<Item = GlobalId>>(&mut self, ids: I) {
864        for id in ids {
865            self.uppers.entry(id).or_insert_with(|| {
866                let mut frontier = MutableAntichain::new();
867                // TODO(guswynn): cluster-unification: fix this dangerous use of `as`, by
868                // merging the types that compute and storage use.
869                #[allow(clippy::as_conversions)]
870                frontier.update_iter(iter::once((T::minimum(), self.parts as i64)));
871                let part_frontiers = vec![Some(Antichain::from_elem(T::minimum())); self.parts];
872
873                (frontier, part_frontiers)
874            });
875        }
876    }
877}
878
879impl<T> PartitionedState<StorageCommand<T>, StorageResponse<T>> for PartitionedStorageState<T>
880where
881    T: timely::progress::Timestamp + Lattice,
882{
883    fn split_command(&mut self, command: StorageCommand<T>) -> Vec<Option<StorageCommand<T>>> {
884        self.observe_command(&command);
885
886        match command {
887            StorageCommand::CreateTimely { config, epoch } => {
888                let timely_cmds = config.split_command(self.parts);
889
890                let timely_cmds = timely_cmds
891                    .into_iter()
892                    .map(|config| Some(StorageCommand::CreateTimely { config, epoch }))
893                    .collect();
894                timely_cmds
895            }
896            command => {
897                // Fan out to all processes (which will fan out to all workers).
898                // StorageState manages ordering of commands internally.
899                vec![Some(command); self.parts]
900            }
901        }
902    }
903
904    fn absorb_response(
905        &mut self,
906        shard_id: usize,
907        response: StorageResponse<T>,
908    ) -> Option<Result<StorageResponse<T>, anyhow::Error>> {
909        match response {
910            // Avoid multiple retractions of minimum time, to present as updates from one worker.
911            StorageResponse::FrontierUppers(list) => {
912                let mut new_uppers = Vec::new();
913
914                for (id, new_shard_upper) in list {
915                    let (frontier, shard_frontiers) = match self.uppers.get_mut(&id) {
916                        Some(value) => value,
917                        None => panic!("Reference to absent collection: {id}"),
918                    };
919                    let old_upper = frontier.frontier().to_owned();
920                    let shard_upper = match &mut shard_frontiers[shard_id] {
921                        Some(shard_upper) => shard_upper,
922                        None => panic!("Reference to absent shard {shard_id} for collection {id}"),
923                    };
924                    frontier.update_iter(shard_upper.iter().map(|t| (t.clone(), -1)));
925                    frontier.update_iter(new_shard_upper.iter().map(|t| (t.clone(), 1)));
926                    shard_upper.join_assign(&new_shard_upper);
927
928                    let new_upper = frontier.frontier();
929                    if PartialOrder::less_than(&old_upper.borrow(), &new_upper) {
930                        new_uppers.push((id, new_upper.to_owned()));
931                    }
932                }
933
934                if new_uppers.is_empty() {
935                    None
936                } else {
937                    Some(Ok(StorageResponse::FrontierUppers(new_uppers)))
938                }
939            }
940            StorageResponse::DroppedId(id) => {
941                let (_, shard_frontiers) = match self.uppers.get_mut(&id) {
942                    Some(value) => value,
943                    None => panic!("Reference to absent collection: {id}"),
944                };
945                let prev = shard_frontiers[shard_id].take();
946                assert!(
947                    prev.is_some(),
948                    "got double drop for {id} from shard {shard_id}"
949                );
950
951                if shard_frontiers.iter().all(Option::is_none) {
952                    self.uppers.remove(&id);
953                    Some(Ok(StorageResponse::DroppedId(id)))
954                } else {
955                    None
956                }
957            }
958            StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
959                // Just forward it along; the `worker_id` should have been set in `storage_state`.
960                // We _could_ consolidate across worker_id's, here, but each worker only produces
961                // responses periodically, so we avoid that complexity.
962                Some(Ok(StorageResponse::StatisticsUpdates(
963                    source_stats,
964                    sink_stats,
965                )))
966            }
967            StorageResponse::StatusUpdates(updates) => {
968                Some(Ok(StorageResponse::StatusUpdates(updates)))
969            }
970            StorageResponse::StagedBatches(batches) => {
971                let mut finished_batches = BTreeMap::new();
972
973                for (collection_id, batches) in batches {
974                    tracing::info!(%shard_id, %collection_id, "got batch");
975
976                    let entry = self
977                        .oneshot_source_responses
978                        .entry(collection_id)
979                        .or_default();
980                    let novel = entry.insert(shard_id, batches);
981                    assert_none!(novel, "Duplicate oneshot source response");
982
983                    // Check if we've received responses from all shards.
984                    if entry.len() == self.parts {
985                        let entry = self
986                            .oneshot_source_responses
987                            .remove(&collection_id)
988                            .expect("checked above");
989                        let all_batches: Vec<_> = entry.into_values().flatten().collect();
990
991                        finished_batches.insert(collection_id, all_batches);
992                    }
993                }
994
995                if !finished_batches.is_empty() {
996                    Some(Ok(StorageResponse::StagedBatches(finished_batches)))
997                } else {
998                    None
999                }
1000            }
1001        }
1002    }
1003}
1004
1005#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
1006/// A batch of updates to be fed to a local input
1007pub struct Update<T = mz_repr::Timestamp> {
1008    pub row: Row,
1009    pub timestamp: T,
1010    pub diff: Diff,
1011}
1012
1013#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
1014/// A batch of updates to be fed to a local input; however, the input must
1015/// determine the most appropriate timestamps to use.
1016///
1017/// TODO(cf2): Can we remove this and use only on [`TableData`].
1018pub struct TimestamplessUpdate {
1019    pub row: Row,
1020    pub diff: Diff,
1021}
1022
1023#[derive(Debug, Clone, PartialEq)]
1024pub enum TableData {
1025    /// Rows that still need to be persisted and appended.
1026    ///
1027    /// The contained [`Row`]s are _not_ consolidated.
1028    Rows(Vec<(Row, Diff)>),
1029    /// Batches already staged in Persist ready to be appended.
1030    Batches(SmallVec<[ProtoBatch; 1]>),
1031}
1032
1033impl TableData {
1034    pub fn is_empty(&self) -> bool {
1035        match self {
1036            TableData::Rows(rows) => rows.is_empty(),
1037            TableData::Batches(batches) => batches.is_empty(),
1038        }
1039    }
1040}
1041
1042/// A collection of timestamp-less updates. As updates are added to the builder
1043/// they are automatically spilled to blob storage.
1044pub struct TimestamplessUpdateBuilder<K, V, T, D>
1045where
1046    K: Codec,
1047    V: Codec,
1048    T: Timestamp + Lattice + Codec64,
1049    D: Codec64,
1050{
1051    builder: BatchBuilder<K, V, T, D>,
1052    initial_ts: T,
1053}
1054
1055impl<K, V, T, D> TimestamplessUpdateBuilder<K, V, T, D>
1056where
1057    K: Debug + Codec,
1058    V: Debug + Codec,
1059    T: TimestampManipulation + Lattice + Codec64 + Sync,
1060    D: Semigroup + Ord + Codec64 + Send + Sync,
1061{
1062    /// Create a new [`TimestamplessUpdateBuilder`] for the shard associated
1063    /// with the provided [`WriteHandle`].
1064    pub fn new(handle: &WriteHandle<K, V, T, D>) -> Self {
1065        let initial_ts = T::minimum();
1066        let builder = handle.builder(Antichain::from_elem(initial_ts.clone()));
1067        TimestamplessUpdateBuilder {
1068            builder,
1069            initial_ts,
1070        }
1071    }
1072
1073    /// Add a `(K, V, D)` to the staged batch.
1074    pub async fn add(&mut self, k: &K, v: &V, d: &D) {
1075        self.builder
1076            .add(k, v, &self.initial_ts, d)
1077            .await
1078            .expect("invalid Persist usage");
1079    }
1080
1081    /// Finish the builder and return a [`ProtoBatch`] which can later be linked into a shard.
1082    ///
1083    /// The returned batch has nonsensical lower and upper bounds and must be re-written before
1084    /// appending into the destination shard.
1085    pub async fn finish(self) -> ProtoBatch {
1086        let finish_ts = StepForward::step_forward(&self.initial_ts);
1087        let batch = self
1088            .builder
1089            .finish(Antichain::from_elem(finish_ts))
1090            .await
1091            .expect("invalid Persist usage");
1092
1093        batch.into_transmittable_batch()
1094    }
1095}
1096
1097impl RustType<ProtoTrace> for (GlobalId, Antichain<mz_repr::Timestamp>) {
1098    fn into_proto(&self) -> ProtoTrace {
1099        ProtoTrace {
1100            id: Some(self.0.into_proto()),
1101            upper: Some(self.1.into_proto()),
1102        }
1103    }
1104
1105    fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1106        Ok((
1107            proto.id.into_rust_if_some("ProtoTrace::id")?,
1108            proto.upper.into_rust_if_some("ProtoTrace::upper")?,
1109        ))
1110    }
1111}
1112
1113impl RustType<ProtoFrontierUppersKind> for Vec<(GlobalId, Antichain<mz_repr::Timestamp>)> {
1114    fn into_proto(&self) -> ProtoFrontierUppersKind {
1115        ProtoFrontierUppersKind {
1116            traces: self.into_proto(),
1117        }
1118    }
1119
1120    fn from_proto(proto: ProtoFrontierUppersKind) -> Result<Self, TryFromProtoError> {
1121        proto.traces.into_rust()
1122    }
1123}
1124
1125impl RustType<ProtoCompaction> for (GlobalId, Antichain<mz_repr::Timestamp>) {
1126    fn into_proto(&self) -> ProtoCompaction {
1127        ProtoCompaction {
1128            id: Some(self.0.into_proto()),
1129            frontier: Some(self.1.into_proto()),
1130        }
1131    }
1132
1133    fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1134        Ok((
1135            proto.id.into_rust_if_some("ProtoCompaction::id")?,
1136            proto
1137                .frontier
1138                .into_rust_if_some("ProtoCompaction::frontier")?,
1139        ))
1140    }
1141}
1142
1143impl TryIntoTimelyConfig for StorageCommand {
1144    fn try_into_timely_config(self) -> Result<(TimelyConfig, ClusterStartupEpoch), Self> {
1145        match self {
1146            StorageCommand::CreateTimely { config, epoch } => Ok((config, epoch)),
1147            cmd => Err(cmd),
1148        }
1149    }
1150}
1151
1152#[cfg(test)]
1153mod tests {
1154    use mz_ore::assert_ok;
1155    use mz_proto::protobuf_roundtrip;
1156    use proptest::prelude::ProptestConfig;
1157    use proptest::proptest;
1158
1159    use super::*;
1160
1161    proptest! {
1162        #![proptest_config(ProptestConfig::with_cases(32))]
1163
1164        #[mz_ore::test]
1165        #[cfg_attr(miri, ignore)] // too slow
1166        fn storage_command_protobuf_roundtrip(expect in any::<StorageCommand<mz_repr::Timestamp>>() ) {
1167            let actual = protobuf_roundtrip::<_, ProtoStorageCommand>(&expect);
1168            assert_ok!(actual);
1169            assert_eq!(actual.unwrap(), expect);
1170        }
1171
1172        #[mz_ore::test]
1173        #[cfg_attr(miri, ignore)] // too slow
1174        fn storage_response_protobuf_roundtrip(expect in any::<StorageResponse<mz_repr::Timestamp>>() ) {
1175            let actual = protobuf_roundtrip::<_, ProtoStorageResponse>(&expect);
1176            assert_ok!(actual);
1177            assert_eq!(actual.unwrap(), expect);
1178        }
1179    }
1180}