Skip to main content

mz_storage_client/
client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![allow(missing_docs)]
11
12//! The public API of the storage layer.
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt::Debug;
16use std::iter;
17
18use async_trait::async_trait;
19use differential_dataflow::difference::Monoid;
20use differential_dataflow::lattice::Lattice;
21use mz_cluster_client::ReplicaId;
22use mz_cluster_client::client::TryIntoProtocolNonce;
23use mz_ore::assert_none;
24use mz_persist_client::batch::{BatchBuilder, ProtoBatch};
25use mz_persist_client::write::WriteHandle;
26use mz_persist_types::{Codec, Codec64, StepForward};
27use mz_repr::{Diff, GlobalId, Row, TimestampManipulation};
28use mz_service::client::{GenericClient, Partitionable, PartitionedState};
29use mz_storage_types::controller::CollectionMetadata;
30use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
31use mz_storage_types::parameters::StorageParameters;
32use mz_storage_types::sinks::StorageSinkDesc;
33use mz_storage_types::sources::IngestionDescription;
34use serde::{Deserialize, Serialize};
35use smallvec::SmallVec;
36use timely::PartialOrder;
37use timely::progress::Timestamp;
38use timely::progress::frontier::{Antichain, MutableAntichain};
39use uuid::Uuid;
40
41use crate::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate};
42
43/// A client to a storage server.
44pub trait StorageClient<T = mz_repr::Timestamp>:
45    GenericClient<StorageCommand<T>, StorageResponse<T>>
46{
47}
48
49impl<C, T> StorageClient<T> for C where C: GenericClient<StorageCommand<T>, StorageResponse<T>> {}
50
51#[async_trait]
52impl<T: Send> GenericClient<StorageCommand<T>, StorageResponse<T>> for Box<dyn StorageClient<T>> {
53    async fn send(&mut self, cmd: StorageCommand<T>) -> Result<(), anyhow::Error> {
54        (**self).send(cmd).await
55    }
56
57    /// # Cancel safety
58    ///
59    /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
60    /// statement and some other branch completes first, it is guaranteed that no messages were
61    /// received by this client.
62    async fn recv(&mut self) -> Result<Option<StorageResponse<T>>, anyhow::Error> {
63        // `GenericClient::recv` is required to be cancel safe.
64        (**self).recv().await
65    }
66}
67
68/// Commands related to the ingress and egress of collections.
69#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
70pub enum StorageCommand<T = mz_repr::Timestamp> {
71    /// Transmits connection meta information, before other commands are sent.
72    Hello {
73        nonce: Uuid,
74    },
75    /// Indicates that the controller has sent all commands reflecting its
76    /// initial state.
77    InitializationComplete,
78    /// `AllowWrites` informs the replica that it can transition out of the
79    /// read-only stage and into the read-write computation stage.
80    /// It is now allowed to affect changes to external systems (writes).
81    ///
82    /// See `ComputeCommand::AllowWrites` for details. This command works
83    /// analogously to the compute version.
84    AllowWrites,
85    /// Update storage instance configuration.
86    UpdateConfiguration(Box<StorageParameters>),
87    /// Run the specified ingestion dataflow.
88    RunIngestion(Box<RunIngestionCommand>),
89    /// Enable compaction in storage-managed collections.
90    ///
91    /// A collection id and a frontier after which accumulations must be correct.
92    AllowCompaction(GlobalId, Antichain<T>),
93    RunSink(Box<RunSinkCommand<T>>),
94    /// Run a dataflow which will ingest data from an external source and only __stage__ it in
95    /// Persist.
96    ///
97    /// Unlike regular ingestions/sources, some other component (e.g. `environmentd`) is
98    /// responsible for linking the staged data into a shard.
99    RunOneshotIngestion(Box<RunOneshotIngestion>),
100    /// `CancelOneshotIngestion` instructs the replica to cancel the identified oneshot ingestion.
101    ///
102    /// It is invalid to send a [`CancelOneshotIngestion`] command that references a oneshot
103    /// ingestion that was not created by a corresponding [`RunOneshotIngestion`] command before.
104    /// Doing so may cause the replica to exhibit undefined behavior.
105    ///
106    /// [`CancelOneshotIngestion`]: crate::client::StorageCommand::CancelOneshotIngestion
107    /// [`RunOneshotIngestion`]: crate::client::StorageCommand::RunOneshotIngestion
108    CancelOneshotIngestion(Uuid),
109}
110
111impl<T> StorageCommand<T> {
112    /// Returns whether this command instructs the installation of storage objects.
113    pub fn installs_objects(&self) -> bool {
114        use StorageCommand::*;
115        match self {
116            Hello { .. }
117            | InitializationComplete
118            | AllowWrites
119            | UpdateConfiguration(_)
120            | AllowCompaction(_, _)
121            | CancelOneshotIngestion { .. } => false,
122            // TODO(cf2): multi-replica oneshot ingestions. At the moment returning
123            // true here means we can't run `COPY FROM` on multi-replica clusters, this
124            // should be easy enough to support though.
125            RunIngestion(_) | RunSink(_) | RunOneshotIngestion(_) => true,
126        }
127    }
128}
129
130/// A command that starts ingesting the given ingestion description
131#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
132pub struct RunIngestionCommand {
133    /// The id of the storage collection being ingested.
134    pub id: GlobalId,
135    /// The description of what source type should be ingested and what post-processing steps must
136    /// be applied to the data before writing them down into the storage collection
137    pub description: IngestionDescription<CollectionMetadata>,
138}
139
140/// A command that starts ingesting the given ingestion description
141#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
142pub struct RunOneshotIngestion {
143    /// The ID of the ingestion dataflow.
144    pub ingestion_id: uuid::Uuid,
145    /// The ID of collection we'll stage batches for.
146    pub collection_id: GlobalId,
147    /// Metadata for the collection we'll stage batches for.
148    pub collection_meta: CollectionMetadata,
149    /// Details for the oneshot ingestion.
150    pub request: OneshotIngestionRequest,
151}
152
153/// A command that starts exporting the given sink description
154#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
155pub struct RunSinkCommand<T> {
156    pub id: GlobalId,
157    pub description: StorageSinkDesc<CollectionMetadata, T>,
158}
159
160/// A "kind" enum for statuses tracked by the health operator
161#[derive(
162    Copy,
163    Clone,
164    Debug,
165    Serialize,
166    Deserialize,
167    PartialEq,
168    Eq,
169    PartialOrd,
170    Ord
171)]
172pub enum Status {
173    Starting,
174    Running,
175    Paused,
176    Stalled,
177    /// This status is currently unused.
178    // re-design the ceased status
179    Ceased,
180    Dropped,
181}
182
183impl std::str::FromStr for Status {
184    type Err = anyhow::Error;
185    /// Keep in sync with [`Status::to_str`].
186    fn from_str(s: &str) -> Result<Self, Self::Err> {
187        Ok(match s {
188            "starting" => Status::Starting,
189            "running" => Status::Running,
190            "paused" => Status::Paused,
191            "stalled" => Status::Stalled,
192            "ceased" => Status::Ceased,
193            "dropped" => Status::Dropped,
194            s => return Err(anyhow::anyhow!("{} is not a valid status", s)),
195        })
196    }
197}
198
199impl Status {
200    /// Keep in sync with `Status::from_str`.
201    pub fn to_str(&self) -> &'static str {
202        match self {
203            Status::Starting => "starting",
204            Status::Running => "running",
205            Status::Paused => "paused",
206            Status::Stalled => "stalled",
207            Status::Ceased => "ceased",
208            Status::Dropped => "dropped",
209        }
210    }
211
212    /// Determines if a new status should be produced in context of a previous
213    /// status.
214    pub fn superseded_by(self, new: Status) -> bool {
215        match (self, new) {
216            (_, Status::Dropped) => true,
217            (Status::Dropped, _) => false,
218            // Don't re-mark that object as paused.
219            (Status::Paused, Status::Paused) => false,
220            // De-duplication of other statuses is currently managed by the
221            // `health_operator`.
222            _ => true,
223        }
224    }
225}
226
227/// A source or sink status update.
228///
229/// Represents a status update for a given object type. The inner value for each
230/// variant should be able to be packed into a status row that conforms to the schema
231/// for the object's status history relation.
232#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
233pub struct StatusUpdate {
234    pub id: GlobalId,
235    pub status: Status,
236    pub timestamp: chrono::DateTime<chrono::Utc>,
237    pub error: Option<String>,
238    pub hints: BTreeSet<String>,
239    pub namespaced_errors: BTreeMap<String, String>,
240    pub replica_id: Option<ReplicaId>,
241}
242
243impl StatusUpdate {
244    pub fn new(
245        id: GlobalId,
246        timestamp: chrono::DateTime<chrono::Utc>,
247        status: Status,
248    ) -> StatusUpdate {
249        StatusUpdate {
250            id,
251            timestamp,
252            status,
253            error: None,
254            hints: Default::default(),
255            namespaced_errors: Default::default(),
256            replica_id: None,
257        }
258    }
259}
260
261impl From<StatusUpdate> for Row {
262    fn from(update: StatusUpdate) -> Self {
263        use mz_repr::Datum;
264
265        let timestamp = Datum::TimestampTz(update.timestamp.try_into().expect("must fit"));
266        let id = update.id.to_string();
267        let id = Datum::String(&id);
268        let status = Datum::String(update.status.to_str());
269        let error = update.error.as_deref().into();
270
271        let mut row = Row::default();
272        let mut packer = row.packer();
273        packer.extend([timestamp, id, status, error]);
274
275        if !update.hints.is_empty() || !update.namespaced_errors.is_empty() {
276            packer.push_dict_with(|dict_packer| {
277                // `hint` and `namespaced` are ordered,
278                // as well as the BTree's they each contain.
279                if !update.hints.is_empty() {
280                    dict_packer.push(Datum::String("hints"));
281                    dict_packer.push_list(update.hints.iter().map(|s| Datum::String(s)));
282                }
283                if !update.namespaced_errors.is_empty() {
284                    dict_packer.push(Datum::String("namespaced"));
285                    dict_packer.push_dict(
286                        update
287                            .namespaced_errors
288                            .iter()
289                            .map(|(k, v)| (k.as_str(), Datum::String(v))),
290                    );
291                }
292            });
293        } else {
294            packer.push(Datum::Null);
295        }
296
297        match update.replica_id {
298            Some(id) => packer.push(Datum::String(&id.to_string())),
299            None => packer.push(Datum::Null),
300        }
301
302        row
303    }
304}
305
306/// An update to an append only collection.
307pub enum AppendOnlyUpdate {
308    Row((Row, Diff)),
309    Status(StatusUpdate),
310}
311
312impl AppendOnlyUpdate {
313    pub fn into_row(self) -> (Row, Diff) {
314        match self {
315            AppendOnlyUpdate::Row((row, diff)) => (row, diff),
316            AppendOnlyUpdate::Status(status) => (Row::from(status), Diff::ONE),
317        }
318    }
319}
320
321impl From<(Row, Diff)> for AppendOnlyUpdate {
322    fn from((row, diff): (Row, Diff)) -> Self {
323        Self::Row((row, diff))
324    }
325}
326
327impl From<StatusUpdate> for AppendOnlyUpdate {
328    fn from(update: StatusUpdate) -> Self {
329        Self::Status(update)
330    }
331}
332
333/// Responses that the storage nature of a worker/dataflow can provide back to the coordinator.
334#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
335pub enum StorageResponse<T = mz_repr::Timestamp> {
336    /// A new upper frontier for the specified identifier.
337    FrontierUpper(GlobalId, Antichain<T>),
338    /// Punctuation indicates that no more responses will be transmitted for the specified id
339    DroppedId(GlobalId),
340    /// Batches that have been staged in Persist and maybe will be linked into a shard.
341    StagedBatches(BTreeMap<uuid::Uuid, Vec<Result<ProtoBatch, String>>>),
342    /// A list of statistics updates, currently only for sources.
343    StatisticsUpdates(Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>),
344    /// A status update for a source or a sink. Periodically sent from
345    /// storage workers to convey the latest status information about an object.
346    StatusUpdate(StatusUpdate),
347}
348
349/// Maintained state for partitioned storage clients.
350///
351/// This helper type unifies the responses of multiple partitioned
352/// workers in order to present as a single worker.
353#[derive(Debug)]
354pub struct PartitionedStorageState<T> {
355    /// Number of partitions the state machine represents.
356    parts: usize,
357    /// Upper frontiers for sources and sinks, both unioned across all partitions and from each
358    /// individual partition.
359    uppers: BTreeMap<GlobalId, (MutableAntichain<T>, Vec<Option<Antichain<T>>>)>,
360    /// Staged batches from oneshot sources that will get appended by `environmentd`.
361    oneshot_source_responses:
362        BTreeMap<uuid::Uuid, BTreeMap<usize, Vec<Result<ProtoBatch, String>>>>,
363}
364
365impl<T> Partitionable<StorageCommand<T>, StorageResponse<T>>
366    for (StorageCommand<T>, StorageResponse<T>)
367where
368    T: timely::progress::Timestamp + Lattice,
369{
370    type PartitionedState = PartitionedStorageState<T>;
371
372    fn new(parts: usize) -> PartitionedStorageState<T> {
373        PartitionedStorageState {
374            parts,
375            uppers: BTreeMap::new(),
376            oneshot_source_responses: BTreeMap::new(),
377        }
378    }
379}
380
381impl<T> PartitionedStorageState<T>
382where
383    T: timely::progress::Timestamp,
384{
385    fn observe_command(&mut self, command: &StorageCommand<T>) {
386        // Note that `observe_command` is quite different in `mz_compute_client`.
387        // Compute (currently) only sends the command to 1 process,
388        // but storage fans out to all workers, allowing the storage processes
389        // to self-coordinate how commands and internal commands are ordered.
390        //
391        // TODO(guswynn): cluster-unification: consolidate this with compute.
392        let _ = match command {
393            StorageCommand::Hello { .. } => {}
394            StorageCommand::RunIngestion(ingestion) => {
395                self.insert_new_uppers(ingestion.description.collection_ids());
396            }
397            StorageCommand::RunSink(export) => {
398                self.insert_new_uppers([export.id]);
399            }
400            StorageCommand::InitializationComplete
401            | StorageCommand::AllowWrites
402            | StorageCommand::UpdateConfiguration(_)
403            | StorageCommand::AllowCompaction(_, _)
404            | StorageCommand::RunOneshotIngestion(_)
405            | StorageCommand::CancelOneshotIngestion { .. } => {}
406        };
407    }
408
409    /// Shared implementation for commands that install uppers with controllable behavior with
410    /// encountering existing uppers.
411    ///
412    /// If any ID was previously tracked in `self` and `skip_existing` is `false`, we return the ID
413    /// as an error.
414    fn insert_new_uppers<I: IntoIterator<Item = GlobalId>>(&mut self, ids: I) {
415        for id in ids {
416            self.uppers.entry(id).or_insert_with(|| {
417                let mut frontier = MutableAntichain::new();
418                // TODO(guswynn): cluster-unification: fix this dangerous use of `as`, by
419                // merging the types that compute and storage use.
420                #[allow(clippy::as_conversions)]
421                frontier.update_iter(iter::once((T::minimum(), self.parts as i64)));
422                let part_frontiers = vec![Some(Antichain::from_elem(T::minimum())); self.parts];
423
424                (frontier, part_frontiers)
425            });
426        }
427    }
428}
429
430impl<T> PartitionedState<StorageCommand<T>, StorageResponse<T>> for PartitionedStorageState<T>
431where
432    T: timely::progress::Timestamp + Lattice,
433{
434    fn split_command(&mut self, command: StorageCommand<T>) -> Vec<Option<StorageCommand<T>>> {
435        self.observe_command(&command);
436
437        // Fan out to all processes (which will fan out to all workers).
438        // StorageState manages ordering of commands internally.
439        vec![Some(command); self.parts]
440    }
441
442    fn absorb_response(
443        &mut self,
444        shard_id: usize,
445        response: StorageResponse<T>,
446    ) -> Option<Result<StorageResponse<T>, anyhow::Error>> {
447        match response {
448            // Avoid multiple retractions of minimum time, to present as updates from one worker.
449            StorageResponse::FrontierUpper(id, new_shard_upper) => {
450                let (frontier, shard_frontiers) = match self.uppers.get_mut(&id) {
451                    Some(value) => value,
452                    None => panic!("Reference to absent collection: {id}"),
453                };
454                let old_upper = frontier.frontier().to_owned();
455                let shard_upper = match &mut shard_frontiers[shard_id] {
456                    Some(shard_upper) => shard_upper,
457                    None => panic!("Reference to absent shard {shard_id} for collection {id}"),
458                };
459                frontier.update_iter(shard_upper.iter().map(|t| (t.clone(), -1)));
460                frontier.update_iter(new_shard_upper.iter().map(|t| (t.clone(), 1)));
461                shard_upper.join_assign(&new_shard_upper);
462
463                let new_upper = frontier.frontier();
464                if PartialOrder::less_than(&old_upper.borrow(), &new_upper) {
465                    Some(Ok(StorageResponse::FrontierUpper(id, new_upper.to_owned())))
466                } else {
467                    None
468                }
469            }
470            StorageResponse::DroppedId(id) => {
471                let (_, shard_frontiers) = match self.uppers.get_mut(&id) {
472                    Some(value) => value,
473                    None => panic!("Reference to absent collection: {id}"),
474                };
475                let prev = shard_frontiers[shard_id].take();
476                assert!(
477                    prev.is_some(),
478                    "got double drop for {id} from shard {shard_id}"
479                );
480
481                if shard_frontiers.iter().all(Option::is_none) {
482                    self.uppers.remove(&id);
483                    Some(Ok(StorageResponse::DroppedId(id)))
484                } else {
485                    None
486                }
487            }
488            StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
489                // Just forward it along; the `worker_id` should have been set in `storage_state`.
490                // We _could_ consolidate across worker_id's, here, but each worker only produces
491                // responses periodically, so we avoid that complexity.
492                Some(Ok(StorageResponse::StatisticsUpdates(
493                    source_stats,
494                    sink_stats,
495                )))
496            }
497            StorageResponse::StatusUpdate(updates) => {
498                Some(Ok(StorageResponse::StatusUpdate(updates)))
499            }
500            StorageResponse::StagedBatches(batches) => {
501                let mut finished_batches = BTreeMap::new();
502
503                for (collection_id, batches) in batches {
504                    tracing::info!(%shard_id, %collection_id, "got batch");
505
506                    let entry = self
507                        .oneshot_source_responses
508                        .entry(collection_id)
509                        .or_default();
510                    let novel = entry.insert(shard_id, batches);
511                    assert_none!(novel, "Duplicate oneshot source response");
512
513                    // Check if we've received responses from all shards.
514                    if entry.len() == self.parts {
515                        let entry = self
516                            .oneshot_source_responses
517                            .remove(&collection_id)
518                            .expect("checked above");
519                        let all_batches: Vec<_> = entry.into_values().flatten().collect();
520
521                        finished_batches.insert(collection_id, all_batches);
522                    }
523                }
524
525                if !finished_batches.is_empty() {
526                    Some(Ok(StorageResponse::StagedBatches(finished_batches)))
527                } else {
528                    None
529                }
530            }
531        }
532    }
533}
534
535#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
536/// A batch of updates to be fed to a local input
537pub struct Update<T = mz_repr::Timestamp> {
538    pub row: Row,
539    pub timestamp: T,
540    pub diff: Diff,
541}
542
543#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
544/// A batch of updates to be fed to a local input; however, the input must
545/// determine the most appropriate timestamps to use.
546///
547/// TODO(cf2): Can we remove this and use only on [`TableData`].
548pub struct TimestamplessUpdate {
549    pub row: Row,
550    pub diff: Diff,
551}
552
553#[derive(Debug, Clone, PartialEq)]
554pub enum TableData {
555    /// Rows that still need to be persisted and appended.
556    ///
557    /// The contained [`Row`]s are _not_ consolidated.
558    Rows(Vec<(Row, Diff)>),
559    /// Batches already staged in Persist ready to be appended.
560    Batches(SmallVec<[ProtoBatch; 1]>),
561}
562
563impl TableData {
564    pub fn is_empty(&self) -> bool {
565        match self {
566            TableData::Rows(rows) => rows.is_empty(),
567            TableData::Batches(batches) => batches.is_empty(),
568        }
569    }
570}
571
572/// A collection of timestamp-less updates. As updates are added to the builder
573/// they are automatically spilled to blob storage.
574pub struct TimestamplessUpdateBuilder<K, V, T, D>
575where
576    K: Codec,
577    V: Codec,
578    T: Timestamp + Lattice + Codec64,
579    D: Codec64,
580{
581    builder: BatchBuilder<K, V, T, D>,
582    initial_ts: T,
583}
584
585impl<K, V, T, D> TimestamplessUpdateBuilder<K, V, T, D>
586where
587    K: Debug + Codec,
588    V: Debug + Codec,
589    T: TimestampManipulation + Lattice + Codec64 + Sync,
590    D: Monoid + Ord + Codec64 + Send + Sync,
591{
592    /// Create a new [`TimestamplessUpdateBuilder`] for the shard associated
593    /// with the provided [`WriteHandle`].
594    pub fn new(handle: &WriteHandle<K, V, T, D>) -> Self {
595        let initial_ts = T::minimum();
596        let builder = handle.builder(Antichain::from_elem(initial_ts.clone()));
597        TimestamplessUpdateBuilder {
598            builder,
599            initial_ts,
600        }
601    }
602
603    /// Add a `(K, V, D)` to the staged batch.
604    pub async fn add(&mut self, k: &K, v: &V, d: &D) {
605        self.builder
606            .add(k, v, &self.initial_ts, d)
607            .await
608            .expect("invalid Persist usage");
609    }
610
611    /// Finish the builder and return a [`ProtoBatch`] which can later be linked into a shard.
612    ///
613    /// The returned batch has nonsensical lower and upper bounds and must be re-written before
614    /// appending into the destination shard.
615    pub async fn finish(self) -> ProtoBatch {
616        let finish_ts = StepForward::step_forward(&self.initial_ts);
617        let batch = self
618            .builder
619            .finish(Antichain::from_elem(finish_ts))
620            .await
621            .expect("invalid Persist usage");
622
623        batch.into_transmittable_batch()
624    }
625}
626
627impl TryIntoProtocolNonce for StorageCommand {
628    fn try_into_protocol_nonce(self) -> Result<Uuid, Self> {
629        match self {
630            StorageCommand::Hello { nonce } => Ok(nonce),
631            cmd => Err(cmd),
632        }
633    }
634}
635
636#[cfg(test)]
637mod tests {
638    use super::*;
639
640    /// Test to ensure the size of the `StorageCommand` enum doesn't regress.
641    #[mz_ore::test]
642    fn test_storage_command_size() {
643        assert_eq!(std::mem::size_of::<StorageCommand>(), 40);
644    }
645
646    /// Test to ensure the size of the `StorageResponse` enum doesn't regress.
647    #[mz_ore::test]
648    fn test_storage_response_size() {
649        assert_eq!(std::mem::size_of::<StorageResponse>(), 120);
650    }
651}