Skip to main content

mz_storage_client/
client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![allow(missing_docs)]
11
12//! The public API of the storage layer.
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt::Debug;
16use std::iter;
17
18use async_trait::async_trait;
19use differential_dataflow::difference::Monoid;
20use differential_dataflow::lattice::Lattice;
21use mz_cluster_client::ReplicaId;
22use mz_cluster_client::client::TryIntoProtocolNonce;
23use mz_ore::assert_none;
24use mz_persist_client::batch::{BatchBuilder, ProtoBatch};
25use mz_persist_client::write::WriteHandle;
26use mz_persist_types::{Codec, Codec64, StepForward};
27use mz_repr::{Diff, GlobalId, Row, Timestamp};
28use mz_service::client::{GenericClient, Partitionable, PartitionedState};
29use mz_storage_types::controller::CollectionMetadata;
30use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
31use mz_storage_types::parameters::StorageParameters;
32use mz_storage_types::sinks::StorageSinkDesc;
33use mz_storage_types::sources::IngestionDescription;
34use serde::{Deserialize, Serialize};
35use smallvec::SmallVec;
36use timely::PartialOrder;
37use timely::progress::frontier::{Antichain, MutableAntichain};
38use uuid::Uuid;
39
40use crate::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate};
41
42/// A client to a storage server.
43pub trait StorageClient: GenericClient<StorageCommand, StorageResponse> {}
44
45impl<C> StorageClient for C where C: GenericClient<StorageCommand, StorageResponse> {}
46
47#[async_trait]
48impl GenericClient<StorageCommand, StorageResponse> for Box<dyn StorageClient> {
49    async fn send(&mut self, cmd: StorageCommand) -> Result<(), anyhow::Error> {
50        (**self).send(cmd).await
51    }
52
53    /// # Cancel safety
54    ///
55    /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
56    /// statement and some other branch completes first, it is guaranteed that no messages were
57    /// received by this client.
58    async fn recv(&mut self) -> Result<Option<StorageResponse>, anyhow::Error> {
59        // `GenericClient::recv` is required to be cancel safe.
60        (**self).recv().await
61    }
62}
63
64/// Commands related to the ingress and egress of collections.
65#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
66pub enum StorageCommand {
67    /// Transmits connection meta information, before other commands are sent.
68    Hello {
69        nonce: Uuid,
70    },
71    /// Indicates that the controller has sent all commands reflecting its
72    /// initial state.
73    InitializationComplete,
74    /// `AllowWrites` informs the replica that it can transition out of the
75    /// read-only stage and into the read-write computation stage.
76    /// It is now allowed to affect changes to external systems (writes).
77    ///
78    /// See `ComputeCommand::AllowWrites` for details. This command works
79    /// analogously to the compute version.
80    AllowWrites,
81    /// Update storage instance configuration.
82    UpdateConfiguration(Box<StorageParameters>),
83    /// Run the specified ingestion dataflow.
84    RunIngestion(Box<RunIngestionCommand>),
85    /// Enable compaction in storage-managed collections.
86    ///
87    /// A collection id and a frontier after which accumulations must be correct.
88    AllowCompaction(GlobalId, Antichain<Timestamp>),
89    RunSink(Box<RunSinkCommand>),
90    /// Run a dataflow which will ingest data from an external source and only __stage__ it in
91    /// Persist.
92    ///
93    /// Unlike regular ingestions/sources, some other component (e.g. `environmentd`) is
94    /// responsible for linking the staged data into a shard.
95    RunOneshotIngestion(Box<RunOneshotIngestion>),
96    /// `CancelOneshotIngestion` instructs the replica to cancel the identified oneshot ingestion.
97    ///
98    /// It is invalid to send a [`CancelOneshotIngestion`] command that references a oneshot
99    /// ingestion that was not created by a corresponding [`RunOneshotIngestion`] command before.
100    /// Doing so may cause the replica to exhibit undefined behavior.
101    ///
102    /// [`CancelOneshotIngestion`]: crate::client::StorageCommand::CancelOneshotIngestion
103    /// [`RunOneshotIngestion`]: crate::client::StorageCommand::RunOneshotIngestion
104    CancelOneshotIngestion(Uuid),
105}
106
107impl StorageCommand {
108    /// Returns whether this command instructs the installation of storage objects.
109    pub fn installs_objects(&self) -> bool {
110        use StorageCommand::*;
111        match self {
112            Hello { .. }
113            | InitializationComplete
114            | AllowWrites
115            | UpdateConfiguration(_)
116            | AllowCompaction(_, _)
117            | CancelOneshotIngestion { .. } => false,
118            // TODO(cf2): multi-replica oneshot ingestions. At the moment returning
119            // true here means we can't run `COPY FROM` on multi-replica clusters, this
120            // should be easy enough to support though.
121            RunIngestion(_) | RunSink(_) | RunOneshotIngestion(_) => true,
122        }
123    }
124}
125
126/// A command that starts ingesting the given ingestion description
127#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
128pub struct RunIngestionCommand {
129    /// The id of the storage collection being ingested.
130    pub id: GlobalId,
131    /// The description of what source type should be ingested and what post-processing steps must
132    /// be applied to the data before writing them down into the storage collection
133    pub description: IngestionDescription<CollectionMetadata>,
134}
135
136/// A command that starts ingesting the given ingestion description
137#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
138pub struct RunOneshotIngestion {
139    /// The ID of the ingestion dataflow.
140    pub ingestion_id: uuid::Uuid,
141    /// The ID of collection we'll stage batches for.
142    pub collection_id: GlobalId,
143    /// Metadata for the collection we'll stage batches for.
144    pub collection_meta: CollectionMetadata,
145    /// Details for the oneshot ingestion.
146    pub request: OneshotIngestionRequest,
147}
148
149/// A command that starts exporting the given sink description
150#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
151pub struct RunSinkCommand {
152    pub id: GlobalId,
153    pub description: StorageSinkDesc<CollectionMetadata>,
154}
155
156/// A "kind" enum for statuses tracked by the health operator
157#[derive(
158    Copy,
159    Clone,
160    Debug,
161    Serialize,
162    Deserialize,
163    PartialEq,
164    Eq,
165    PartialOrd,
166    Ord
167)]
168pub enum Status {
169    Starting,
170    Running,
171    Paused,
172    Stalled,
173    /// This status is currently unused.
174    // re-design the ceased status
175    Ceased,
176    Dropped,
177}
178
179impl std::str::FromStr for Status {
180    type Err = anyhow::Error;
181    /// Keep in sync with [`Status::to_str`].
182    fn from_str(s: &str) -> Result<Self, Self::Err> {
183        Ok(match s {
184            "starting" => Status::Starting,
185            "running" => Status::Running,
186            "paused" => Status::Paused,
187            "stalled" => Status::Stalled,
188            "ceased" => Status::Ceased,
189            "dropped" => Status::Dropped,
190            s => return Err(anyhow::anyhow!("{} is not a valid status", s)),
191        })
192    }
193}
194
195impl Status {
196    /// Keep in sync with `Status::from_str`.
197    pub fn to_str(&self) -> &'static str {
198        match self {
199            Status::Starting => "starting",
200            Status::Running => "running",
201            Status::Paused => "paused",
202            Status::Stalled => "stalled",
203            Status::Ceased => "ceased",
204            Status::Dropped => "dropped",
205        }
206    }
207
208    /// Determines if a new status should be produced in context of a previous
209    /// status.
210    pub fn superseded_by(self, new: Status) -> bool {
211        match (self, new) {
212            (_, Status::Dropped) => true,
213            (Status::Dropped, _) => false,
214            // Don't re-mark that object as paused.
215            (Status::Paused, Status::Paused) => false,
216            // De-duplication of other statuses is currently managed by the
217            // `health_operator`.
218            _ => true,
219        }
220    }
221}
222
223/// A source or sink status update.
224///
225/// Represents a status update for a given object type. The inner value for each
226/// variant should be able to be packed into a status row that conforms to the schema
227/// for the object's status history relation.
228#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
229pub struct StatusUpdate {
230    pub id: GlobalId,
231    pub status: Status,
232    pub timestamp: chrono::DateTime<chrono::Utc>,
233    pub error: Option<String>,
234    pub hints: BTreeSet<String>,
235    pub namespaced_errors: BTreeMap<String, String>,
236    pub replica_id: Option<ReplicaId>,
237}
238
239impl StatusUpdate {
240    pub fn new(
241        id: GlobalId,
242        timestamp: chrono::DateTime<chrono::Utc>,
243        status: Status,
244    ) -> StatusUpdate {
245        StatusUpdate {
246            id,
247            timestamp,
248            status,
249            error: None,
250            hints: Default::default(),
251            namespaced_errors: Default::default(),
252            replica_id: None,
253        }
254    }
255}
256
257impl From<StatusUpdate> for Row {
258    fn from(update: StatusUpdate) -> Self {
259        use mz_repr::Datum;
260
261        let timestamp = Datum::TimestampTz(update.timestamp.try_into().expect("must fit"));
262        let id = update.id.to_string();
263        let id = Datum::String(&id);
264        let status = Datum::String(update.status.to_str());
265        let error = update.error.as_deref().into();
266
267        let mut row = Row::default();
268        let mut packer = row.packer();
269        packer.extend([timestamp, id, status, error]);
270
271        if !update.hints.is_empty() || !update.namespaced_errors.is_empty() {
272            packer.push_dict_with(|dict_packer| {
273                // `hint` and `namespaced` are ordered,
274                // as well as the BTree's they each contain.
275                if !update.hints.is_empty() {
276                    dict_packer.push(Datum::String("hints"));
277                    dict_packer.push_list(update.hints.iter().map(|s| Datum::String(s)));
278                }
279                if !update.namespaced_errors.is_empty() {
280                    dict_packer.push(Datum::String("namespaced"));
281                    dict_packer.push_dict(
282                        update
283                            .namespaced_errors
284                            .iter()
285                            .map(|(k, v)| (k.as_str(), Datum::String(v))),
286                    );
287                }
288            });
289        } else {
290            packer.push(Datum::Null);
291        }
292
293        match update.replica_id {
294            Some(id) => packer.push(Datum::String(&id.to_string())),
295            None => packer.push(Datum::Null),
296        }
297
298        row
299    }
300}
301
302/// An update to an append only collection.
303pub enum AppendOnlyUpdate {
304    Row((Row, Diff)),
305    Status(StatusUpdate),
306}
307
308impl AppendOnlyUpdate {
309    pub fn into_row(self) -> (Row, Diff) {
310        match self {
311            AppendOnlyUpdate::Row((row, diff)) => (row, diff),
312            AppendOnlyUpdate::Status(status) => (Row::from(status), Diff::ONE),
313        }
314    }
315}
316
317impl From<(Row, Diff)> for AppendOnlyUpdate {
318    fn from((row, diff): (Row, Diff)) -> Self {
319        Self::Row((row, diff))
320    }
321}
322
323impl From<StatusUpdate> for AppendOnlyUpdate {
324    fn from(update: StatusUpdate) -> Self {
325        Self::Status(update)
326    }
327}
328
329/// Responses that the storage nature of a worker/dataflow can provide back to the coordinator.
330#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
331pub enum StorageResponse {
332    /// A new upper frontier for the specified identifier.
333    FrontierUpper(GlobalId, Antichain<Timestamp>),
334    /// Punctuation indicates that no more responses will be transmitted for the specified id
335    DroppedId(GlobalId),
336    /// Batches that have been staged in Persist and maybe will be linked into a shard.
337    StagedBatches(BTreeMap<uuid::Uuid, Vec<Result<ProtoBatch, String>>>),
338    /// A list of statistics updates, currently only for sources.
339    StatisticsUpdates(Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>),
340    /// A status update for a source or a sink. Periodically sent from
341    /// storage workers to convey the latest status information about an object.
342    StatusUpdate(StatusUpdate),
343}
344
345/// Maintained state for partitioned storage clients.
346///
347/// This helper type unifies the responses of multiple partitioned
348/// workers in order to present as a single worker.
349#[derive(Debug)]
350pub struct PartitionedStorageState {
351    /// Number of partitions the state machine represents.
352    parts: usize,
353    /// Upper frontiers for sources and sinks, both unioned across all partitions and from each
354    /// individual partition.
355    uppers: BTreeMap<
356        GlobalId,
357        (
358            MutableAntichain<Timestamp>,
359            Vec<Option<Antichain<Timestamp>>>,
360        ),
361    >,
362    /// Staged batches from oneshot sources that will get appended by `environmentd`.
363    oneshot_source_responses:
364        BTreeMap<uuid::Uuid, BTreeMap<usize, Vec<Result<ProtoBatch, String>>>>,
365}
366
367impl Partitionable<StorageCommand, StorageResponse> for (StorageCommand, StorageResponse) {
368    type PartitionedState = PartitionedStorageState;
369
370    fn new(parts: usize) -> PartitionedStorageState {
371        PartitionedStorageState {
372            parts,
373            uppers: BTreeMap::new(),
374            oneshot_source_responses: BTreeMap::new(),
375        }
376    }
377}
378
379impl PartitionedStorageState {
380    fn observe_command(&mut self, command: &StorageCommand) {
381        // Note that `observe_command` is quite different in `mz_compute_client`.
382        // Compute (currently) only sends the command to 1 process,
383        // but storage fans out to all workers, allowing the storage processes
384        // to self-coordinate how commands and internal commands are ordered.
385        //
386        // TODO(guswynn): cluster-unification: consolidate this with compute.
387        match command {
388            StorageCommand::Hello { .. } => {}
389            StorageCommand::RunIngestion(ingestion) => {
390                self.insert_new_uppers(ingestion.description.collection_ids());
391            }
392            StorageCommand::RunSink(export) => {
393                self.insert_new_uppers([export.id]);
394            }
395            StorageCommand::InitializationComplete
396            | StorageCommand::AllowWrites
397            | StorageCommand::UpdateConfiguration(_)
398            | StorageCommand::AllowCompaction(_, _)
399            | StorageCommand::RunOneshotIngestion(_)
400            | StorageCommand::CancelOneshotIngestion { .. } => {}
401        }
402    }
403
404    /// Shared implementation for commands that install uppers with controllable behavior with
405    /// encountering existing uppers.
406    ///
407    /// If any ID was previously tracked in `self` and `skip_existing` is `false`, we return the ID
408    /// as an error.
409    fn insert_new_uppers<I: IntoIterator<Item = GlobalId>>(&mut self, ids: I) {
410        for id in ids {
411            self.uppers.entry(id).or_insert_with(|| {
412                let mut frontier = MutableAntichain::new();
413                // TODO(guswynn): cluster-unification: fix this dangerous use of `as`, by
414                // merging the types that compute and storage use.
415                #[allow(clippy::as_conversions)]
416                frontier.update_iter(iter::once((Timestamp::MIN, self.parts as i64)));
417                let part_frontiers = vec![Some(Antichain::from_elem(Timestamp::MIN)); self.parts];
418
419                (frontier, part_frontiers)
420            });
421        }
422    }
423}
424
425impl PartitionedState<StorageCommand, StorageResponse> for PartitionedStorageState {
426    fn split_command(&mut self, command: StorageCommand) -> Vec<Option<StorageCommand>> {
427        self.observe_command(&command);
428
429        // Fan out to all processes (which will fan out to all workers).
430        // StorageState manages ordering of commands internally.
431        vec![Some(command); self.parts]
432    }
433
434    fn absorb_response(
435        &mut self,
436        shard_id: usize,
437        response: StorageResponse,
438    ) -> Option<Result<StorageResponse, anyhow::Error>> {
439        match response {
440            // Avoid multiple retractions of minimum time, to present as updates from one worker.
441            StorageResponse::FrontierUpper(id, new_shard_upper) => {
442                let (frontier, shard_frontiers) = match self.uppers.get_mut(&id) {
443                    Some(value) => value,
444                    None => panic!("Reference to absent collection: {id}"),
445                };
446                let old_upper = frontier.frontier().to_owned();
447                let shard_upper = match &mut shard_frontiers[shard_id] {
448                    Some(shard_upper) => shard_upper,
449                    None => panic!("Reference to absent shard {shard_id} for collection {id}"),
450                };
451                frontier.update_iter(shard_upper.iter().map(|t| (*t, -1)));
452                frontier.update_iter(new_shard_upper.iter().map(|t| (*t, 1)));
453                shard_upper.join_assign(&new_shard_upper);
454
455                let new_upper = frontier.frontier();
456                if PartialOrder::less_than(&old_upper.borrow(), &new_upper) {
457                    Some(Ok(StorageResponse::FrontierUpper(id, new_upper.to_owned())))
458                } else {
459                    None
460                }
461            }
462            StorageResponse::DroppedId(id) => {
463                let (_, shard_frontiers) = match self.uppers.get_mut(&id) {
464                    Some(value) => value,
465                    None => panic!("Reference to absent collection: {id}"),
466                };
467                let prev = shard_frontiers[shard_id].take();
468                assert!(
469                    prev.is_some(),
470                    "got double drop for {id} from shard {shard_id}"
471                );
472
473                if shard_frontiers.iter().all(Option::is_none) {
474                    self.uppers.remove(&id);
475                    Some(Ok(StorageResponse::DroppedId(id)))
476                } else {
477                    None
478                }
479            }
480            StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
481                // Just forward it along; the `worker_id` should have been set in `storage_state`.
482                // We _could_ consolidate across worker_id's, here, but each worker only produces
483                // responses periodically, so we avoid that complexity.
484                Some(Ok(StorageResponse::StatisticsUpdates(
485                    source_stats,
486                    sink_stats,
487                )))
488            }
489            StorageResponse::StatusUpdate(updates) => {
490                Some(Ok(StorageResponse::StatusUpdate(updates)))
491            }
492            StorageResponse::StagedBatches(batches) => {
493                let mut finished_batches = BTreeMap::new();
494
495                for (collection_id, batches) in batches {
496                    tracing::info!(%shard_id, %collection_id, "got batch");
497
498                    let entry = self
499                        .oneshot_source_responses
500                        .entry(collection_id)
501                        .or_default();
502                    let novel = entry.insert(shard_id, batches);
503                    assert_none!(novel, "Duplicate oneshot source response");
504
505                    // Check if we've received responses from all shards.
506                    if entry.len() == self.parts {
507                        let entry = self
508                            .oneshot_source_responses
509                            .remove(&collection_id)
510                            .expect("checked above");
511                        let all_batches: Vec<_> = entry.into_values().flatten().collect();
512
513                        finished_batches.insert(collection_id, all_batches);
514                    }
515                }
516
517                if !finished_batches.is_empty() {
518                    Some(Ok(StorageResponse::StagedBatches(finished_batches)))
519                } else {
520                    None
521                }
522            }
523        }
524    }
525}
526
527#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
528/// A batch of updates to be fed to a local input
529pub struct Update {
530    pub row: Row,
531    pub timestamp: Timestamp,
532    pub diff: Diff,
533}
534
535#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
536/// A batch of updates to be fed to a local input; however, the input must
537/// determine the most appropriate timestamps to use.
538///
539/// TODO(cf2): Can we remove this and use only on [`TableData`].
540pub struct TimestamplessUpdate {
541    pub row: Row,
542    pub diff: Diff,
543}
544
545#[derive(Debug, Clone, PartialEq)]
546pub enum TableData {
547    /// Rows that still need to be persisted and appended.
548    ///
549    /// The contained [`Row`]s are _not_ consolidated.
550    Rows(Vec<(Row, Diff)>),
551    /// Batches already staged in Persist ready to be appended.
552    Batches(SmallVec<[ProtoBatch; 1]>),
553}
554
555impl TableData {
556    pub fn is_empty(&self) -> bool {
557        match self {
558            TableData::Rows(rows) => rows.is_empty(),
559            TableData::Batches(batches) => batches.is_empty(),
560        }
561    }
562}
563
564/// A collection of timestamp-less updates. As updates are added to the builder
565/// they are automatically spilled to blob storage.
566pub struct TimestamplessUpdateBuilder<K, V, D>
567where
568    K: Codec,
569    V: Codec,
570    D: Codec64,
571{
572    builder: BatchBuilder<K, V, Timestamp, D>,
573    initial_ts: Timestamp,
574}
575
576impl<K, V, D> TimestamplessUpdateBuilder<K, V, D>
577where
578    K: Debug + Codec,
579    V: Debug + Codec,
580    D: Monoid + Ord + Codec64 + Send + Sync,
581{
582    /// Create a new [`TimestamplessUpdateBuilder`] for the shard associated
583    /// with the provided [`WriteHandle`].
584    pub fn new(handle: &WriteHandle<K, V, Timestamp, D>) -> Self {
585        let initial_ts = Timestamp::MIN;
586        let builder = handle.builder(Antichain::from_elem(initial_ts));
587        TimestamplessUpdateBuilder {
588            builder,
589            initial_ts,
590        }
591    }
592
593    /// Add a `(K, V, D)` to the staged batch.
594    pub async fn add(&mut self, k: &K, v: &V, d: &D) {
595        self.builder
596            .add(k, v, &self.initial_ts, d)
597            .await
598            .expect("invalid Persist usage");
599    }
600
601    /// Finish the builder and return a [`ProtoBatch`] which can later be linked into a shard.
602    ///
603    /// The returned batch has nonsensical lower and upper bounds and must be re-written before
604    /// appending into the destination shard.
605    pub async fn finish(self) -> ProtoBatch {
606        let finish_ts = StepForward::step_forward(&self.initial_ts);
607        let batch = self
608            .builder
609            .finish(Antichain::from_elem(finish_ts))
610            .await
611            .expect("invalid Persist usage");
612
613        batch.into_transmittable_batch()
614    }
615}
616
617impl TryIntoProtocolNonce for StorageCommand {
618    fn try_into_protocol_nonce(self) -> Result<Uuid, Self> {
619        match self {
620            StorageCommand::Hello { nonce } => Ok(nonce),
621            cmd => Err(cmd),
622        }
623    }
624}
625
626#[cfg(test)]
627mod tests {
628    use super::*;
629
630    /// Test to ensure the size of the `StorageCommand` enum doesn't regress.
631    #[mz_ore::test]
632    fn test_storage_command_size() {
633        assert_eq!(std::mem::size_of::<StorageCommand>(), 40);
634    }
635
636    /// Test to ensure the size of the `StorageResponse` enum doesn't regress.
637    #[mz_ore::test]
638    fn test_storage_response_size() {
639        assert_eq!(std::mem::size_of::<StorageResponse>(), 120);
640    }
641}