mz_storage_client/
client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![allow(missing_docs)]
11// Tonic generates code that violates clippy lints.
12// TODO: Remove this once tonic does not produce this code anymore.
13#![allow(clippy::as_conversions, clippy::clone_on_ref_ptr)]
14
15//! The public API of the storage layer.
16
17use std::collections::{BTreeMap, BTreeSet};
18use std::fmt::Debug;
19use std::iter;
20
21use async_trait::async_trait;
22use differential_dataflow::difference::Semigroup;
23use differential_dataflow::lattice::Lattice;
24use mz_cluster_client::ReplicaId;
25use mz_cluster_client::client::{ClusterStartupEpoch, TimelyConfig, TryIntoTimelyConfig};
26use mz_ore::assert_none;
27use mz_persist_client::batch::{BatchBuilder, ProtoBatch};
28use mz_persist_client::write::WriteHandle;
29use mz_persist_types::{Codec, Codec64, StepForward};
30use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
31use mz_repr::{Diff, GlobalId, Row, TimestampManipulation};
32use mz_service::client::{GenericClient, Partitionable, PartitionedState};
33use mz_service::grpc::{GrpcClient, GrpcServer, ProtoServiceTypes, ResponseStream};
34use mz_storage_types::controller::CollectionMetadata;
35use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
36use mz_storage_types::parameters::StorageParameters;
37use mz_storage_types::sinks::StorageSinkDesc;
38use mz_storage_types::sources::IngestionDescription;
39use mz_timely_util::progress::any_antichain;
40use proptest::prelude::{Arbitrary, any};
41use proptest::strategy::{BoxedStrategy, Strategy, Union};
42use serde::{Deserialize, Serialize};
43use smallvec::SmallVec;
44use timely::PartialOrder;
45use timely::progress::Timestamp;
46use timely::progress::frontier::{Antichain, MutableAntichain};
47use tonic::{Request, Status as TonicStatus, Streaming};
48use uuid::Uuid;
49
50use crate::client::proto_storage_server::ProtoStorage;
51use crate::metrics::ReplicaMetrics;
52use crate::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate};
53
54include!(concat!(env!("OUT_DIR"), "/mz_storage_client.client.rs"));
55
56/// A client to a storage server.
57pub trait StorageClient<T = mz_repr::Timestamp>:
58    GenericClient<StorageCommand<T>, StorageResponse<T>>
59{
60}
61
62impl<C, T> StorageClient<T> for C where C: GenericClient<StorageCommand<T>, StorageResponse<T>> {}
63
64#[async_trait]
65impl<T: Send> GenericClient<StorageCommand<T>, StorageResponse<T>> for Box<dyn StorageClient<T>> {
66    async fn send(&mut self, cmd: StorageCommand<T>) -> Result<(), anyhow::Error> {
67        (**self).send(cmd).await
68    }
69
70    /// # Cancel safety
71    ///
72    /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
73    /// statement and some other branch completes first, it is guaranteed that no messages were
74    /// received by this client.
75    async fn recv(&mut self) -> Result<Option<StorageResponse<T>>, anyhow::Error> {
76        // `GenericClient::recv` is required to be cancel safe.
77        (**self).recv().await
78    }
79}
80
81#[derive(Debug, Clone)]
82pub enum StorageProtoServiceTypes {}
83
84impl ProtoServiceTypes for StorageProtoServiceTypes {
85    type PC = ProtoStorageCommand;
86    type PR = ProtoStorageResponse;
87    type STATS = ReplicaMetrics;
88    const URL: &'static str = "/mz_storage_client.client.ProtoStorage/CommandResponseStream";
89}
90
91pub type StorageGrpcClient = GrpcClient<StorageProtoServiceTypes>;
92
93#[async_trait]
94impl<F, G> ProtoStorage for GrpcServer<F>
95where
96    F: Fn() -> G + Send + Sync + 'static,
97    G: StorageClient + 'static,
98{
99    type CommandResponseStreamStream = ResponseStream<ProtoStorageResponse>;
100
101    async fn command_response_stream(
102        &self,
103        request: Request<Streaming<ProtoStorageCommand>>,
104    ) -> Result<tonic::Response<Self::CommandResponseStreamStream>, TonicStatus> {
105        self.forward_bidi_stream(request).await
106    }
107}
108
109/// Commands related to the ingress and egress of collections.
110#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
111pub enum StorageCommand<T = mz_repr::Timestamp> {
112    /// Specifies to the storage server(s) the shape of the timely cluster
113    /// we want created, before other commands are sent.
114    CreateTimely {
115        config: Box<TimelyConfig>,
116        epoch: ClusterStartupEpoch,
117    },
118    /// Indicates that the controller has sent all commands reflecting its
119    /// initial state.
120    InitializationComplete,
121    /// `AllowWrites` informs the replica that it can transition out of the
122    /// read-only stage and into the read-write computation stage.
123    /// It is now allowed to affect changes to external systems (writes).
124    ///
125    /// See `ComputeCommand::AllowWrites` for details. This command works
126    /// analogously to the compute version.
127    AllowWrites,
128    /// Update storage instance configuration.
129    UpdateConfiguration(Box<StorageParameters>),
130    /// Run the specified ingestion dataflow.
131    RunIngestion(Box<RunIngestionCommand>),
132    /// Enable compaction in storage-managed collections.
133    ///
134    /// A collection id and a frontier after which accumulations must be correct.
135    AllowCompaction(GlobalId, Antichain<T>),
136    RunSink(Box<RunSinkCommand<T>>),
137    /// Run a dataflow which will ingest data from an external source and only __stage__ it in
138    /// Persist.
139    ///
140    /// Unlike regular ingestions/sources, some other component (e.g. `environmentd`) is
141    /// responsible for linking the staged data into a shard.
142    RunOneshotIngestion(Box<RunOneshotIngestion>),
143    /// `CancelOneshotIngestion` instructs the replica to cancel the identified oneshot ingestion.
144    ///
145    /// It is invalid to send a [`CancelOneshotIngestion`] command that references a oneshot
146    /// ingestion that was not created by a corresponding [`RunOneshotIngestion`] command before.
147    /// Doing so may cause the replica to exhibit undefined behavior.
148    ///
149    /// [`CancelOneshotIngestion`]: crate::client::StorageCommand::CancelOneshotIngestion
150    /// [`RunOneshotIngestion`]: crate::client::StorageCommand::RunOneshotIngestion
151    CancelOneshotIngestion(Uuid),
152}
153
154impl<T> StorageCommand<T> {
155    /// Returns whether this command instructs the installation of storage objects.
156    pub fn installs_objects(&self) -> bool {
157        use StorageCommand::*;
158        match self {
159            CreateTimely { .. }
160            | InitializationComplete
161            | AllowWrites
162            | UpdateConfiguration(_)
163            | AllowCompaction(_, _)
164            | CancelOneshotIngestion { .. } => false,
165            // TODO(cf2): multi-replica oneshot ingestions. At the moment returning
166            // true here means we can't run `COPY FROM` on multi-replica clusters, this
167            // should be easy enough to support though.
168            RunIngestion(_) | RunSink(_) | RunOneshotIngestion(_) => true,
169        }
170    }
171}
172
173/// A command that starts ingesting the given ingestion description
174#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
175pub struct RunIngestionCommand {
176    /// The id of the storage collection being ingested.
177    pub id: GlobalId,
178    /// The description of what source type should be ingested and what post-processing steps must
179    /// be applied to the data before writing them down into the storage collection
180    pub description: IngestionDescription<CollectionMetadata>,
181}
182
183impl Arbitrary for RunIngestionCommand {
184    type Strategy = BoxedStrategy<Self>;
185    type Parameters = ();
186
187    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
188        (
189            any::<GlobalId>(),
190            any::<IngestionDescription<CollectionMetadata>>(),
191        )
192            .prop_map(|(id, description)| Self { id, description })
193            .boxed()
194    }
195}
196
197impl RustType<ProtoRunIngestionCommand> for RunIngestionCommand {
198    fn into_proto(&self) -> ProtoRunIngestionCommand {
199        ProtoRunIngestionCommand {
200            id: Some(self.id.into_proto()),
201            description: Some(self.description.into_proto()),
202        }
203    }
204
205    fn from_proto(proto: ProtoRunIngestionCommand) -> Result<Self, TryFromProtoError> {
206        Ok(RunIngestionCommand {
207            id: proto.id.into_rust_if_some("ProtoRunIngestionCommand::id")?,
208            description: proto
209                .description
210                .into_rust_if_some("ProtoRunIngestionCommand::description")?,
211        })
212    }
213}
214
215/// A command that starts ingesting the given ingestion description
216#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
217pub struct RunOneshotIngestion {
218    /// The ID of the ingestion dataflow.
219    pub ingestion_id: uuid::Uuid,
220    /// The ID of collection we'll stage batches for.
221    pub collection_id: GlobalId,
222    /// Metadata for the collection we'll stage batches for.
223    pub collection_meta: CollectionMetadata,
224    /// Details for the oneshot ingestion.
225    pub request: OneshotIngestionRequest,
226}
227
228impl RustType<ProtoRunOneshotIngestion> for RunOneshotIngestion {
229    fn into_proto(&self) -> ProtoRunOneshotIngestion {
230        ProtoRunOneshotIngestion {
231            ingestion_id: Some(self.ingestion_id.into_proto()),
232            collection_id: Some(self.collection_id.into_proto()),
233            storage_metadata: Some(self.collection_meta.into_proto()),
234            request: Some(self.request.into_proto()),
235        }
236    }
237
238    fn from_proto(proto: ProtoRunOneshotIngestion) -> Result<Self, TryFromProtoError> {
239        Ok(RunOneshotIngestion {
240            ingestion_id: proto
241                .ingestion_id
242                .into_rust_if_some("ProtoRunOneshotIngestion::ingestion_id")?,
243            collection_id: proto
244                .collection_id
245                .into_rust_if_some("ProtoRunOneshotIngestion::collection_id")?,
246            collection_meta: proto
247                .storage_metadata
248                .into_rust_if_some("ProtoRunOneshotIngestion::storage_metadata")?,
249            request: proto
250                .request
251                .into_rust_if_some("ProtoRunOneshotIngestion::request")?,
252        })
253    }
254}
255
256impl RustType<ProtoRunSinkCommand> for RunSinkCommand<mz_repr::Timestamp> {
257    fn into_proto(&self) -> ProtoRunSinkCommand {
258        ProtoRunSinkCommand {
259            id: Some(self.id.into_proto()),
260            description: Some(self.description.into_proto()),
261        }
262    }
263
264    fn from_proto(proto: ProtoRunSinkCommand) -> Result<Self, TryFromProtoError> {
265        Ok(RunSinkCommand {
266            id: proto.id.into_rust_if_some("ProtoRunSinkCommand::id")?,
267            description: proto
268                .description
269                .into_rust_if_some("ProtoRunSinkCommand::description")?,
270        })
271    }
272}
273
274/// A command that starts exporting the given sink description
275#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
276pub struct RunSinkCommand<T> {
277    pub id: GlobalId,
278    pub description: StorageSinkDesc<CollectionMetadata, T>,
279}
280
281impl Arbitrary for RunSinkCommand<mz_repr::Timestamp> {
282    type Strategy = BoxedStrategy<Self>;
283    type Parameters = ();
284
285    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
286        (
287            any::<GlobalId>(),
288            any::<StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>>(),
289        )
290            .prop_map(|(id, description)| Self { id, description })
291            .boxed()
292    }
293}
294
295impl RustType<ProtoStorageCommand> for StorageCommand<mz_repr::Timestamp> {
296    fn into_proto(&self) -> ProtoStorageCommand {
297        use proto_storage_command::Kind::*;
298        use proto_storage_command::*;
299        ProtoStorageCommand {
300            kind: Some(match self {
301                StorageCommand::CreateTimely { config, epoch } => CreateTimely(ProtoCreateTimely {
302                    config: Some(*config.into_proto()),
303                    epoch: Some(epoch.into_proto()),
304                }),
305                StorageCommand::InitializationComplete => InitializationComplete(()),
306                StorageCommand::AllowWrites => AllowWrites(()),
307                StorageCommand::UpdateConfiguration(params) => {
308                    UpdateConfiguration(*params.into_proto())
309                }
310                StorageCommand::AllowCompaction(id, frontier) => AllowCompaction(ProtoCompaction {
311                    id: Some(id.into_proto()),
312                    frontier: Some(frontier.into_proto()),
313                }),
314                StorageCommand::RunIngestion(ingestion) => RunIngestion(*ingestion.into_proto()),
315                StorageCommand::RunSink(sink) => RunSink(*sink.into_proto()),
316                StorageCommand::RunOneshotIngestion(ingestion) => {
317                    RunOneshotIngestion(*ingestion.into_proto())
318                }
319                StorageCommand::CancelOneshotIngestion(uuid) => {
320                    CancelOneshotIngestion(uuid.into_proto())
321                }
322            }),
323        }
324    }
325
326    fn from_proto(proto: ProtoStorageCommand) -> Result<Self, TryFromProtoError> {
327        use proto_storage_command::Kind::*;
328        use proto_storage_command::*;
329        match proto.kind {
330            Some(CreateTimely(ProtoCreateTimely { config, epoch })) => {
331                let config = Box::new(config.into_rust_if_some("ProtoCreateTimely::config")?);
332                let epoch = epoch.into_rust_if_some("ProtoCreateTimely::epoch")?;
333                Ok(StorageCommand::CreateTimely { config, epoch })
334            }
335            Some(InitializationComplete(())) => Ok(StorageCommand::InitializationComplete),
336            Some(AllowWrites(())) => Ok(StorageCommand::AllowWrites),
337            Some(UpdateConfiguration(params)) => {
338                let params = Box::new(params.into_rust()?);
339                Ok(StorageCommand::UpdateConfiguration(params))
340            }
341            Some(RunIngestion(ingestion)) => {
342                let ingestion = Box::new(ingestion.into_rust()?);
343                Ok(StorageCommand::RunIngestion(ingestion))
344            }
345            Some(AllowCompaction(ProtoCompaction { id, frontier })) => {
346                Ok(StorageCommand::AllowCompaction(
347                    id.into_rust_if_some("ProtoCompaction::id")?,
348                    frontier.into_rust_if_some("ProtoCompaction::frontier")?,
349                ))
350            }
351            Some(RunSink(sink)) => {
352                let sink = Box::new(sink.into_rust()?);
353                Ok(StorageCommand::RunSink(sink))
354            }
355            Some(RunOneshotIngestion(ingestion)) => {
356                let ingestion = Box::new(ingestion.into_rust()?);
357                Ok(StorageCommand::RunOneshotIngestion(ingestion))
358            }
359            Some(CancelOneshotIngestion(uuid)) => {
360                Ok(StorageCommand::CancelOneshotIngestion(uuid.into_rust()?))
361            }
362            None => Err(TryFromProtoError::missing_field(
363                "ProtoStorageCommand::kind",
364            )),
365        }
366    }
367}
368
369impl Arbitrary for StorageCommand<mz_repr::Timestamp> {
370    type Strategy = Union<BoxedStrategy<Self>>;
371    type Parameters = ();
372
373    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
374        Union::new(vec![
375            // TODO(guswynn): cluster-unification: also test `CreateTimely` here.
376            any::<RunIngestionCommand>()
377                .prop_map(Box::new)
378                .prop_map(StorageCommand::RunIngestion)
379                .boxed(),
380            any::<RunSinkCommand<mz_repr::Timestamp>>()
381                .prop_map(Box::new)
382                .prop_map(StorageCommand::RunSink)
383                .boxed(),
384            (any::<GlobalId>(), any_antichain())
385                .prop_map(|(id, frontier)| StorageCommand::AllowCompaction(id, frontier))
386                .boxed(),
387        ])
388    }
389}
390
391/// A "kind" enum for statuses tracked by the health operator
392#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
393pub enum Status {
394    Starting,
395    Running,
396    Paused,
397    Stalled,
398    /// This status is currently unused.
399    // re-design the ceased status
400    Ceased,
401    Dropped,
402}
403
404impl std::str::FromStr for Status {
405    type Err = anyhow::Error;
406    /// Keep in sync with [`Status::to_str`].
407    fn from_str(s: &str) -> Result<Self, Self::Err> {
408        Ok(match s {
409            "starting" => Status::Starting,
410            "running" => Status::Running,
411            "paused" => Status::Paused,
412            "stalled" => Status::Stalled,
413            "ceased" => Status::Ceased,
414            "dropped" => Status::Dropped,
415            s => return Err(anyhow::anyhow!("{} is not a valid status", s)),
416        })
417    }
418}
419
420impl Status {
421    /// Keep in sync with `Status::from_str`.
422    pub fn to_str(&self) -> &'static str {
423        match self {
424            Status::Starting => "starting",
425            Status::Running => "running",
426            Status::Paused => "paused",
427            Status::Stalled => "stalled",
428            Status::Ceased => "ceased",
429            Status::Dropped => "dropped",
430        }
431    }
432
433    /// Determines if a new status should be produced in context of a previous
434    /// status.
435    pub fn superseded_by(self, new: Status) -> bool {
436        match (self, new) {
437            (_, Status::Dropped) => true,
438            (Status::Dropped, _) => false,
439            // Don't re-mark that object as paused.
440            (Status::Paused, Status::Paused) => false,
441            // De-duplication of other statuses is currently managed by the
442            // `health_operator`.
443            _ => true,
444        }
445    }
446}
447
448/// A source or sink status update.
449///
450/// Represents a status update for a given object type. The inner value for each
451/// variant should be able to be packed into a status row that conforms to the schema
452/// for the object's status history relation.
453#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
454pub struct StatusUpdate {
455    pub id: GlobalId,
456    pub status: Status,
457    pub timestamp: chrono::DateTime<chrono::Utc>,
458    pub error: Option<String>,
459    pub hints: BTreeSet<String>,
460    pub namespaced_errors: BTreeMap<String, String>,
461    pub replica_id: Option<ReplicaId>,
462}
463
464impl StatusUpdate {
465    pub fn new(
466        id: GlobalId,
467        timestamp: chrono::DateTime<chrono::Utc>,
468        status: Status,
469    ) -> StatusUpdate {
470        StatusUpdate {
471            id,
472            timestamp,
473            status,
474            error: None,
475            hints: Default::default(),
476            namespaced_errors: Default::default(),
477            replica_id: None,
478        }
479    }
480}
481
482impl From<StatusUpdate> for Row {
483    fn from(update: StatusUpdate) -> Self {
484        use mz_repr::Datum;
485
486        let timestamp = Datum::TimestampTz(update.timestamp.try_into().expect("must fit"));
487        let id = update.id.to_string();
488        let id = Datum::String(&id);
489        let status = Datum::String(update.status.to_str());
490        let error = update.error.as_deref().into();
491
492        let mut row = Row::default();
493        let mut packer = row.packer();
494        packer.extend([timestamp, id, status, error]);
495
496        if !update.hints.is_empty() || !update.namespaced_errors.is_empty() {
497            packer.push_dict_with(|dict_packer| {
498                // `hint` and `namespaced` are ordered,
499                // as well as the BTree's they each contain.
500                if !update.hints.is_empty() {
501                    dict_packer.push(Datum::String("hints"));
502                    dict_packer.push_list(update.hints.iter().map(|s| Datum::String(s)));
503                }
504                if !update.namespaced_errors.is_empty() {
505                    dict_packer.push(Datum::String("namespaced"));
506                    dict_packer.push_dict(
507                        update
508                            .namespaced_errors
509                            .iter()
510                            .map(|(k, v)| (k.as_str(), Datum::String(v))),
511                    );
512                }
513            });
514        } else {
515            packer.push(Datum::Null);
516        }
517
518        match update.replica_id {
519            Some(id) => packer.push(Datum::String(&id.to_string())),
520            None => packer.push(Datum::Null),
521        }
522
523        row
524    }
525}
526
527impl RustType<proto_storage_response::ProtoStatus> for Status {
528    fn into_proto(&self) -> proto_storage_response::ProtoStatus {
529        use proto_storage_response::proto_status::*;
530
531        proto_storage_response::ProtoStatus {
532            kind: Some(match self {
533                Status::Starting => Kind::Starting(()),
534                Status::Running => Kind::Running(()),
535                Status::Paused => Kind::Paused(()),
536                Status::Stalled => Kind::Stalled(()),
537                Status::Ceased => Kind::Ceased(()),
538                Status::Dropped => Kind::Dropped(()),
539            }),
540        }
541    }
542
543    fn from_proto(proto: proto_storage_response::ProtoStatus) -> Result<Self, TryFromProtoError> {
544        use proto_storage_response::proto_status::*;
545        let kind = proto
546            .kind
547            .ok_or_else(|| TryFromProtoError::missing_field("ProtoStatus::kind"))?;
548
549        Ok(match kind {
550            Kind::Starting(()) => Status::Starting,
551            Kind::Running(()) => Status::Running,
552            Kind::Paused(()) => Status::Paused,
553            Kind::Stalled(()) => Status::Stalled,
554            Kind::Ceased(()) => Status::Ceased,
555            Kind::Dropped(()) => Status::Dropped,
556        })
557    }
558}
559
560impl RustType<proto_storage_response::ProtoStatusUpdate> for StatusUpdate {
561    fn into_proto(&self) -> proto_storage_response::ProtoStatusUpdate {
562        proto_storage_response::ProtoStatusUpdate {
563            id: Some(self.id.into_proto()),
564            status: Some(self.status.into_proto()),
565            timestamp: Some(self.timestamp.into_proto()),
566            error: self.error.clone(),
567            hints: self.hints.iter().cloned().collect(),
568            namespaced_errors: self.namespaced_errors.clone(),
569            replica_id: self.replica_id.map(|id| id.to_string().into_proto()),
570        }
571    }
572
573    fn from_proto(
574        proto: proto_storage_response::ProtoStatusUpdate,
575    ) -> Result<Self, TryFromProtoError> {
576        Ok(StatusUpdate {
577            id: proto.id.into_rust_if_some("ProtoStatusUpdate::id")?,
578            timestamp: proto
579                .timestamp
580                .into_rust_if_some("ProtoStatusUpdate::timestamp")?,
581            status: proto
582                .status
583                .into_rust_if_some("ProtoStatusUpdate::status")?,
584            error: proto.error,
585            hints: proto.hints.into_iter().collect(),
586            namespaced_errors: proto.namespaced_errors,
587            replica_id: proto
588                .replica_id
589                .map(|replica_id: String| replica_id.parse().expect("must be a valid replica id")),
590        })
591    }
592}
593
594/// An update to an append only collection.
595pub enum AppendOnlyUpdate {
596    Row((Row, Diff)),
597    Status(StatusUpdate),
598}
599
600impl AppendOnlyUpdate {
601    pub fn into_row(self) -> (Row, Diff) {
602        match self {
603            AppendOnlyUpdate::Row((row, diff)) => (row, diff),
604            AppendOnlyUpdate::Status(status) => (Row::from(status), Diff::ONE),
605        }
606    }
607}
608
609impl From<(Row, Diff)> for AppendOnlyUpdate {
610    fn from((row, diff): (Row, Diff)) -> Self {
611        Self::Row((row, diff))
612    }
613}
614
615impl From<StatusUpdate> for AppendOnlyUpdate {
616    fn from(update: StatusUpdate) -> Self {
617        Self::Status(update)
618    }
619}
620
621/// Responses that the storage nature of a worker/dataflow can provide back to the coordinator.
622#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
623pub enum StorageResponse<T = mz_repr::Timestamp> {
624    /// A new upper frontier for the specified identifier.
625    FrontierUpper(GlobalId, Antichain<T>),
626    /// Punctuation indicates that no more responses will be transmitted for the specified id
627    DroppedId(GlobalId),
628    /// Batches that have been staged in Persist and maybe will be linked into a shard.
629    StagedBatches(BTreeMap<uuid::Uuid, Vec<Result<ProtoBatch, String>>>),
630    /// A list of statistics updates, currently only for sources.
631    StatisticsUpdates(Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>),
632    /// A status update for a source or a sink. Periodically sent from
633    /// storage workers to convey the latest status information about an object.
634    StatusUpdate(StatusUpdate),
635}
636
637impl RustType<ProtoStorageResponse> for StorageResponse<mz_repr::Timestamp> {
638    fn into_proto(&self) -> ProtoStorageResponse {
639        use proto_storage_response::Kind::*;
640        use proto_storage_response::{
641            ProtoDroppedId, ProtoFrontierUpper, ProtoStagedBatches, ProtoStatisticsUpdates,
642        };
643        ProtoStorageResponse {
644            kind: Some(match self {
645                StorageResponse::FrontierUpper(id, upper) => FrontierUpper(ProtoFrontierUpper {
646                    id: Some(id.into_proto()),
647                    upper: Some(upper.into_proto()),
648                }),
649                StorageResponse::DroppedId(id) => DroppedId(ProtoDroppedId {
650                    id: Some(id.into_proto()),
651                }),
652                StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
653                    Stats(ProtoStatisticsUpdates {
654                        source_updates: source_stats
655                            .iter()
656                            .map(|update| update.into_proto())
657                            .collect(),
658                        sink_updates: sink_stats
659                            .iter()
660                            .map(|update| update.into_proto())
661                            .collect(),
662                    })
663                }
664                StorageResponse::StatusUpdate(update) => StatusUpdate(update.into_proto()),
665                StorageResponse::StagedBatches(staged) => {
666                    let batches = staged
667                        .into_iter()
668                        .map(|(collection_id, batches)| {
669                            let batches = batches
670                                .into_iter()
671                                .map(|result| {
672                                    use proto_storage_response::proto_staged_batches::batch_result::Value;
673                                    let value = match result {
674                                        Ok(batch) => Value::Batch(batch.clone()),
675                                        Err(err) => Value::Error(err.clone()),
676                                    };
677                                    proto_storage_response::proto_staged_batches::BatchResult { value: Some(value) }
678                                })
679                                .collect();
680                            proto_storage_response::proto_staged_batches::Inner {
681                                id: Some(collection_id.into_proto()),
682                                batches,
683                            }
684                        })
685                        .collect();
686                    StagedBatches(ProtoStagedBatches { batches })
687                }
688            }),
689        }
690    }
691
692    fn from_proto(proto: ProtoStorageResponse) -> Result<Self, TryFromProtoError> {
693        use proto_storage_response::Kind::*;
694        use proto_storage_response::{ProtoDroppedId, ProtoFrontierUpper};
695        match proto.kind {
696            Some(DroppedId(ProtoDroppedId { id })) => Ok(StorageResponse::DroppedId(
697                id.into_rust_if_some("ProtoDroppedId::id")?,
698            )),
699            Some(FrontierUpper(ProtoFrontierUpper { id, upper })) => {
700                Ok(StorageResponse::FrontierUpper(
701                    id.into_rust_if_some("ProtoFrontierUpper::id")?,
702                    upper.into_rust_if_some("ProtoFrontierUpper::upper")?,
703                ))
704            }
705            Some(Stats(stats)) => Ok(StorageResponse::StatisticsUpdates(
706                stats
707                    .source_updates
708                    .into_iter()
709                    .map(|update| update.into_rust())
710                    .collect::<Result<Vec<_>, TryFromProtoError>>()?,
711                stats
712                    .sink_updates
713                    .into_iter()
714                    .map(|update| update.into_rust())
715                    .collect::<Result<Vec<_>, TryFromProtoError>>()?,
716            )),
717            Some(StatusUpdate(update)) => Ok(StorageResponse::StatusUpdate(update.into_rust()?)),
718            Some(StagedBatches(staged)) => {
719                let batches: BTreeMap<_, _> = staged
720                    .batches
721                    .into_iter()
722                    .map(|inner| {
723                        let id = inner
724                            .id
725                            .into_rust_if_some("ProtoStagedBatches::Inner::id")?;
726
727                        let mut batches = Vec::with_capacity(inner.batches.len());
728                        for maybe_batch in inner.batches {
729                            use proto_storage_response::proto_staged_batches::batch_result::Value;
730
731                            let value = maybe_batch.value.ok_or_else(|| {
732                                TryFromProtoError::missing_field("BatchResult::value")
733                            })?;
734                            let batch = match value {
735                                Value::Batch(batch) => Ok(batch),
736                                Value::Error(err) => Err(err),
737                            };
738                            batches.push(batch);
739                        }
740
741                        Ok::<_, TryFromProtoError>((id, batches))
742                    })
743                    .collect::<Result<_, _>>()?;
744
745                Ok(StorageResponse::StagedBatches(batches))
746            }
747            None => Err(TryFromProtoError::missing_field(
748                "ProtoStorageResponse::kind",
749            )),
750        }
751    }
752}
753
754impl Arbitrary for StorageResponse<mz_repr::Timestamp> {
755    type Strategy = Union<BoxedStrategy<Self>>;
756    type Parameters = ();
757
758    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
759        // TODO(guswynn): test `SourceStatisticsUpdates`
760        Union::new(vec![
761            (any::<GlobalId>(), any_antichain())
762                .prop_map(|(id, upper)| StorageResponse::FrontierUpper(id, upper))
763                .boxed(),
764        ])
765    }
766}
767
768/// Maintained state for partitioned storage clients.
769///
770/// This helper type unifies the responses of multiple partitioned
771/// workers in order to present as a single worker.
772#[derive(Debug)]
773pub struct PartitionedStorageState<T> {
774    /// Number of partitions the state machine represents.
775    parts: usize,
776    /// Upper frontiers for sources and sinks, both unioned across all partitions and from each
777    /// individual partition.
778    uppers: BTreeMap<GlobalId, (MutableAntichain<T>, Vec<Option<Antichain<T>>>)>,
779    /// Staged batches from oneshot sources that will get appended by `environmentd`.
780    oneshot_source_responses:
781        BTreeMap<uuid::Uuid, BTreeMap<usize, Vec<Result<ProtoBatch, String>>>>,
782}
783
784impl<T> Partitionable<StorageCommand<T>, StorageResponse<T>>
785    for (StorageCommand<T>, StorageResponse<T>)
786where
787    T: timely::progress::Timestamp + Lattice,
788{
789    type PartitionedState = PartitionedStorageState<T>;
790
791    fn new(parts: usize) -> PartitionedStorageState<T> {
792        PartitionedStorageState {
793            parts,
794            uppers: BTreeMap::new(),
795            oneshot_source_responses: BTreeMap::new(),
796        }
797    }
798}
799
800impl<T> PartitionedStorageState<T>
801where
802    T: timely::progress::Timestamp,
803{
804    fn observe_command(&mut self, command: &StorageCommand<T>) {
805        // Note that `observe_command` is quite different in `mz_compute_client`.
806        // Compute (currently) only sends the command to 1 process,
807        // but storage fans out to all workers, allowing the storage processes
808        // to self-coordinate how commands and internal commands are ordered.
809        //
810        // TODO(guswynn): cluster-unification: consolidate this with compute.
811        let _ = match command {
812            StorageCommand::CreateTimely { .. } => {
813                // Similarly, we don't reset state here like compute, because,
814                // until we are required to manage multiple replicas, we can handle
815                // keeping track of state across restarts of storage server(s).
816            }
817            StorageCommand::RunIngestion(ingestion) => {
818                self.insert_new_uppers(ingestion.description.collection_ids());
819            }
820            StorageCommand::RunSink(export) => {
821                self.insert_new_uppers([export.id]);
822            }
823            StorageCommand::InitializationComplete
824            | StorageCommand::AllowWrites
825            | StorageCommand::UpdateConfiguration(_)
826            | StorageCommand::AllowCompaction(_, _)
827            | StorageCommand::RunOneshotIngestion(_)
828            | StorageCommand::CancelOneshotIngestion { .. } => {}
829        };
830    }
831
832    /// Shared implementation for commands that install uppers with controllable behavior with
833    /// encountering existing uppers.
834    ///
835    /// If any ID was previously tracked in `self` and `skip_existing` is `false`, we return the ID
836    /// as an error.
837    fn insert_new_uppers<I: IntoIterator<Item = GlobalId>>(&mut self, ids: I) {
838        for id in ids {
839            self.uppers.entry(id).or_insert_with(|| {
840                let mut frontier = MutableAntichain::new();
841                // TODO(guswynn): cluster-unification: fix this dangerous use of `as`, by
842                // merging the types that compute and storage use.
843                #[allow(clippy::as_conversions)]
844                frontier.update_iter(iter::once((T::minimum(), self.parts as i64)));
845                let part_frontiers = vec![Some(Antichain::from_elem(T::minimum())); self.parts];
846
847                (frontier, part_frontiers)
848            });
849        }
850    }
851}
852
853impl<T> PartitionedState<StorageCommand<T>, StorageResponse<T>> for PartitionedStorageState<T>
854where
855    T: timely::progress::Timestamp + Lattice,
856{
857    fn split_command(&mut self, command: StorageCommand<T>) -> Vec<Option<StorageCommand<T>>> {
858        self.observe_command(&command);
859
860        match command {
861            StorageCommand::CreateTimely { config, epoch } => {
862                let timely_cmds = config.split_command(self.parts);
863
864                let timely_cmds = timely_cmds
865                    .into_iter()
866                    .map(|config| {
867                        Some(StorageCommand::CreateTimely {
868                            config: Box::new(config),
869                            epoch,
870                        })
871                    })
872                    .collect();
873                timely_cmds
874            }
875            command => {
876                // Fan out to all processes (which will fan out to all workers).
877                // StorageState manages ordering of commands internally.
878                vec![Some(command); self.parts]
879            }
880        }
881    }
882
883    fn absorb_response(
884        &mut self,
885        shard_id: usize,
886        response: StorageResponse<T>,
887    ) -> Option<Result<StorageResponse<T>, anyhow::Error>> {
888        match response {
889            // Avoid multiple retractions of minimum time, to present as updates from one worker.
890            StorageResponse::FrontierUpper(id, new_shard_upper) => {
891                let (frontier, shard_frontiers) = match self.uppers.get_mut(&id) {
892                    Some(value) => value,
893                    None => panic!("Reference to absent collection: {id}"),
894                };
895                let old_upper = frontier.frontier().to_owned();
896                let shard_upper = match &mut shard_frontiers[shard_id] {
897                    Some(shard_upper) => shard_upper,
898                    None => panic!("Reference to absent shard {shard_id} for collection {id}"),
899                };
900                frontier.update_iter(shard_upper.iter().map(|t| (t.clone(), -1)));
901                frontier.update_iter(new_shard_upper.iter().map(|t| (t.clone(), 1)));
902                shard_upper.join_assign(&new_shard_upper);
903
904                let new_upper = frontier.frontier();
905                if PartialOrder::less_than(&old_upper.borrow(), &new_upper) {
906                    Some(Ok(StorageResponse::FrontierUpper(id, new_upper.to_owned())))
907                } else {
908                    None
909                }
910            }
911            StorageResponse::DroppedId(id) => {
912                let (_, shard_frontiers) = match self.uppers.get_mut(&id) {
913                    Some(value) => value,
914                    None => panic!("Reference to absent collection: {id}"),
915                };
916                let prev = shard_frontiers[shard_id].take();
917                assert!(
918                    prev.is_some(),
919                    "got double drop for {id} from shard {shard_id}"
920                );
921
922                if shard_frontiers.iter().all(Option::is_none) {
923                    self.uppers.remove(&id);
924                    Some(Ok(StorageResponse::DroppedId(id)))
925                } else {
926                    None
927                }
928            }
929            StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
930                // Just forward it along; the `worker_id` should have been set in `storage_state`.
931                // We _could_ consolidate across worker_id's, here, but each worker only produces
932                // responses periodically, so we avoid that complexity.
933                Some(Ok(StorageResponse::StatisticsUpdates(
934                    source_stats,
935                    sink_stats,
936                )))
937            }
938            StorageResponse::StatusUpdate(updates) => {
939                Some(Ok(StorageResponse::StatusUpdate(updates)))
940            }
941            StorageResponse::StagedBatches(batches) => {
942                let mut finished_batches = BTreeMap::new();
943
944                for (collection_id, batches) in batches {
945                    tracing::info!(%shard_id, %collection_id, "got batch");
946
947                    let entry = self
948                        .oneshot_source_responses
949                        .entry(collection_id)
950                        .or_default();
951                    let novel = entry.insert(shard_id, batches);
952                    assert_none!(novel, "Duplicate oneshot source response");
953
954                    // Check if we've received responses from all shards.
955                    if entry.len() == self.parts {
956                        let entry = self
957                            .oneshot_source_responses
958                            .remove(&collection_id)
959                            .expect("checked above");
960                        let all_batches: Vec<_> = entry.into_values().flatten().collect();
961
962                        finished_batches.insert(collection_id, all_batches);
963                    }
964                }
965
966                if !finished_batches.is_empty() {
967                    Some(Ok(StorageResponse::StagedBatches(finished_batches)))
968                } else {
969                    None
970                }
971            }
972        }
973    }
974}
975
976#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
977/// A batch of updates to be fed to a local input
978pub struct Update<T = mz_repr::Timestamp> {
979    pub row: Row,
980    pub timestamp: T,
981    pub diff: Diff,
982}
983
984#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
985/// A batch of updates to be fed to a local input; however, the input must
986/// determine the most appropriate timestamps to use.
987///
988/// TODO(cf2): Can we remove this and use only on [`TableData`].
989pub struct TimestamplessUpdate {
990    pub row: Row,
991    pub diff: Diff,
992}
993
994#[derive(Debug, Clone, PartialEq)]
995pub enum TableData {
996    /// Rows that still need to be persisted and appended.
997    ///
998    /// The contained [`Row`]s are _not_ consolidated.
999    Rows(Vec<(Row, Diff)>),
1000    /// Batches already staged in Persist ready to be appended.
1001    Batches(SmallVec<[ProtoBatch; 1]>),
1002}
1003
1004impl TableData {
1005    pub fn is_empty(&self) -> bool {
1006        match self {
1007            TableData::Rows(rows) => rows.is_empty(),
1008            TableData::Batches(batches) => batches.is_empty(),
1009        }
1010    }
1011}
1012
1013/// A collection of timestamp-less updates. As updates are added to the builder
1014/// they are automatically spilled to blob storage.
1015pub struct TimestamplessUpdateBuilder<K, V, T, D>
1016where
1017    K: Codec,
1018    V: Codec,
1019    T: Timestamp + Lattice + Codec64,
1020    D: Codec64,
1021{
1022    builder: BatchBuilder<K, V, T, D>,
1023    initial_ts: T,
1024}
1025
1026impl<K, V, T, D> TimestamplessUpdateBuilder<K, V, T, D>
1027where
1028    K: Debug + Codec,
1029    V: Debug + Codec,
1030    T: TimestampManipulation + Lattice + Codec64 + Sync,
1031    D: Semigroup + Ord + Codec64 + Send + Sync,
1032{
1033    /// Create a new [`TimestamplessUpdateBuilder`] for the shard associated
1034    /// with the provided [`WriteHandle`].
1035    pub fn new(handle: &WriteHandle<K, V, T, D>) -> Self {
1036        let initial_ts = T::minimum();
1037        let builder = handle.builder(Antichain::from_elem(initial_ts.clone()));
1038        TimestamplessUpdateBuilder {
1039            builder,
1040            initial_ts,
1041        }
1042    }
1043
1044    /// Add a `(K, V, D)` to the staged batch.
1045    pub async fn add(&mut self, k: &K, v: &V, d: &D) {
1046        self.builder
1047            .add(k, v, &self.initial_ts, d)
1048            .await
1049            .expect("invalid Persist usage");
1050    }
1051
1052    /// Finish the builder and return a [`ProtoBatch`] which can later be linked into a shard.
1053    ///
1054    /// The returned batch has nonsensical lower and upper bounds and must be re-written before
1055    /// appending into the destination shard.
1056    pub async fn finish(self) -> ProtoBatch {
1057        let finish_ts = StepForward::step_forward(&self.initial_ts);
1058        let batch = self
1059            .builder
1060            .finish(Antichain::from_elem(finish_ts))
1061            .await
1062            .expect("invalid Persist usage");
1063
1064        batch.into_transmittable_batch()
1065    }
1066}
1067
1068impl RustType<ProtoCompaction> for (GlobalId, Antichain<mz_repr::Timestamp>) {
1069    fn into_proto(&self) -> ProtoCompaction {
1070        ProtoCompaction {
1071            id: Some(self.0.into_proto()),
1072            frontier: Some(self.1.into_proto()),
1073        }
1074    }
1075
1076    fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1077        Ok((
1078            proto.id.into_rust_if_some("ProtoCompaction::id")?,
1079            proto
1080                .frontier
1081                .into_rust_if_some("ProtoCompaction::frontier")?,
1082        ))
1083    }
1084}
1085
1086impl TryIntoTimelyConfig for StorageCommand {
1087    fn try_into_timely_config(self) -> Result<(TimelyConfig, ClusterStartupEpoch), Self> {
1088        match self {
1089            StorageCommand::CreateTimely { config, epoch } => Ok((*config, epoch)),
1090            cmd => Err(cmd),
1091        }
1092    }
1093}
1094
1095#[cfg(test)]
1096mod tests {
1097    use mz_ore::assert_ok;
1098    use mz_proto::protobuf_roundtrip;
1099    use proptest::prelude::ProptestConfig;
1100    use proptest::proptest;
1101
1102    use super::*;
1103
1104    /// Test to ensure the size of the `StorageCommand` enum doesn't regress.
1105    #[mz_ore::test]
1106    fn test_storage_command_size() {
1107        assert_eq!(std::mem::size_of::<StorageCommand>(), 40);
1108    }
1109
1110    /// Test to ensure the size of the `StorageResponse` enum doesn't regress.
1111    #[mz_ore::test]
1112    fn test_storage_response_size() {
1113        assert_eq!(std::mem::size_of::<StorageResponse>(), 120);
1114    }
1115
1116    proptest! {
1117        #![proptest_config(ProptestConfig::with_cases(32))]
1118
1119        #[mz_ore::test]
1120        #[cfg_attr(miri, ignore)] // too slow
1121        fn storage_command_protobuf_roundtrip(expect in any::<StorageCommand<mz_repr::Timestamp>>() ) {
1122            let actual = protobuf_roundtrip::<_, ProtoStorageCommand>(&expect);
1123            assert_ok!(actual);
1124            assert_eq!(actual.unwrap(), expect);
1125        }
1126
1127        #[mz_ore::test]
1128        #[cfg_attr(miri, ignore)] // too slow
1129        fn storage_response_protobuf_roundtrip(expect in any::<StorageResponse<mz_repr::Timestamp>>() ) {
1130            let actual = protobuf_roundtrip::<_, ProtoStorageResponse>(&expect);
1131            assert_ok!(actual);
1132            assert_eq!(actual.unwrap(), expect);
1133        }
1134    }
1135}