mz_storage_client/
client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![allow(missing_docs)]
11// Tonic generates code that violates clippy lints.
12// TODO: Remove this once tonic does not produce this code anymore.
13#![allow(clippy::as_conversions, clippy::clone_on_ref_ptr)]
14
15//! The public API of the storage layer.
16
17use std::collections::{BTreeMap, BTreeSet};
18use std::fmt::Debug;
19use std::iter;
20
21use async_trait::async_trait;
22use differential_dataflow::difference::Semigroup;
23use differential_dataflow::lattice::Lattice;
24use mz_cluster_client::ReplicaId;
25use mz_cluster_client::client::TryIntoProtocolNonce;
26use mz_ore::assert_none;
27use mz_persist_client::batch::{BatchBuilder, ProtoBatch};
28use mz_persist_client::write::WriteHandle;
29use mz_persist_types::{Codec, Codec64, StepForward};
30use mz_repr::{Diff, GlobalId, Row, TimestampManipulation};
31use mz_service::client::{GenericClient, Partitionable, PartitionedState};
32use mz_storage_types::controller::CollectionMetadata;
33use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
34use mz_storage_types::parameters::StorageParameters;
35use mz_storage_types::sinks::StorageSinkDesc;
36use mz_storage_types::sources::IngestionDescription;
37use serde::{Deserialize, Serialize};
38use smallvec::SmallVec;
39use timely::PartialOrder;
40use timely::progress::Timestamp;
41use timely::progress::frontier::{Antichain, MutableAntichain};
42use uuid::Uuid;
43
44use crate::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate};
45
46/// A client to a storage server.
47pub trait StorageClient<T = mz_repr::Timestamp>:
48    GenericClient<StorageCommand<T>, StorageResponse<T>>
49{
50}
51
52impl<C, T> StorageClient<T> for C where C: GenericClient<StorageCommand<T>, StorageResponse<T>> {}
53
54#[async_trait]
55impl<T: Send> GenericClient<StorageCommand<T>, StorageResponse<T>> for Box<dyn StorageClient<T>> {
56    async fn send(&mut self, cmd: StorageCommand<T>) -> Result<(), anyhow::Error> {
57        (**self).send(cmd).await
58    }
59
60    /// # Cancel safety
61    ///
62    /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
63    /// statement and some other branch completes first, it is guaranteed that no messages were
64    /// received by this client.
65    async fn recv(&mut self) -> Result<Option<StorageResponse<T>>, anyhow::Error> {
66        // `GenericClient::recv` is required to be cancel safe.
67        (**self).recv().await
68    }
69}
70
71/// Commands related to the ingress and egress of collections.
72#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
73pub enum StorageCommand<T = mz_repr::Timestamp> {
74    /// Transmits connection meta information, before other commands are sent.
75    Hello {
76        nonce: Uuid,
77    },
78    /// Indicates that the controller has sent all commands reflecting its
79    /// initial state.
80    InitializationComplete,
81    /// `AllowWrites` informs the replica that it can transition out of the
82    /// read-only stage and into the read-write computation stage.
83    /// It is now allowed to affect changes to external systems (writes).
84    ///
85    /// See `ComputeCommand::AllowWrites` for details. This command works
86    /// analogously to the compute version.
87    AllowWrites,
88    /// Update storage instance configuration.
89    UpdateConfiguration(Box<StorageParameters>),
90    /// Run the specified ingestion dataflow.
91    RunIngestion(Box<RunIngestionCommand>),
92    /// Enable compaction in storage-managed collections.
93    ///
94    /// A collection id and a frontier after which accumulations must be correct.
95    AllowCompaction(GlobalId, Antichain<T>),
96    RunSink(Box<RunSinkCommand<T>>),
97    /// Run a dataflow which will ingest data from an external source and only __stage__ it in
98    /// Persist.
99    ///
100    /// Unlike regular ingestions/sources, some other component (e.g. `environmentd`) is
101    /// responsible for linking the staged data into a shard.
102    RunOneshotIngestion(Box<RunOneshotIngestion>),
103    /// `CancelOneshotIngestion` instructs the replica to cancel the identified oneshot ingestion.
104    ///
105    /// It is invalid to send a [`CancelOneshotIngestion`] command that references a oneshot
106    /// ingestion that was not created by a corresponding [`RunOneshotIngestion`] command before.
107    /// Doing so may cause the replica to exhibit undefined behavior.
108    ///
109    /// [`CancelOneshotIngestion`]: crate::client::StorageCommand::CancelOneshotIngestion
110    /// [`RunOneshotIngestion`]: crate::client::StorageCommand::RunOneshotIngestion
111    CancelOneshotIngestion(Uuid),
112}
113
114impl<T> StorageCommand<T> {
115    /// Returns whether this command instructs the installation of storage objects.
116    pub fn installs_objects(&self) -> bool {
117        use StorageCommand::*;
118        match self {
119            Hello { .. }
120            | InitializationComplete
121            | AllowWrites
122            | UpdateConfiguration(_)
123            | AllowCompaction(_, _)
124            | CancelOneshotIngestion { .. } => false,
125            // TODO(cf2): multi-replica oneshot ingestions. At the moment returning
126            // true here means we can't run `COPY FROM` on multi-replica clusters, this
127            // should be easy enough to support though.
128            RunIngestion(_) | RunSink(_) | RunOneshotIngestion(_) => true,
129        }
130    }
131}
132
133/// A command that starts ingesting the given ingestion description
134#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
135pub struct RunIngestionCommand {
136    /// The id of the storage collection being ingested.
137    pub id: GlobalId,
138    /// The description of what source type should be ingested and what post-processing steps must
139    /// be applied to the data before writing them down into the storage collection
140    pub description: IngestionDescription<CollectionMetadata>,
141}
142
143/// A command that starts ingesting the given ingestion description
144#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
145pub struct RunOneshotIngestion {
146    /// The ID of the ingestion dataflow.
147    pub ingestion_id: uuid::Uuid,
148    /// The ID of collection we'll stage batches for.
149    pub collection_id: GlobalId,
150    /// Metadata for the collection we'll stage batches for.
151    pub collection_meta: CollectionMetadata,
152    /// Details for the oneshot ingestion.
153    pub request: OneshotIngestionRequest,
154}
155
156/// A command that starts exporting the given sink description
157#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
158pub struct RunSinkCommand<T> {
159    pub id: GlobalId,
160    pub description: StorageSinkDesc<CollectionMetadata, T>,
161}
162
163/// A "kind" enum for statuses tracked by the health operator
164#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
165pub enum Status {
166    Starting,
167    Running,
168    Paused,
169    Stalled,
170    /// This status is currently unused.
171    // re-design the ceased status
172    Ceased,
173    Dropped,
174}
175
176impl std::str::FromStr for Status {
177    type Err = anyhow::Error;
178    /// Keep in sync with [`Status::to_str`].
179    fn from_str(s: &str) -> Result<Self, Self::Err> {
180        Ok(match s {
181            "starting" => Status::Starting,
182            "running" => Status::Running,
183            "paused" => Status::Paused,
184            "stalled" => Status::Stalled,
185            "ceased" => Status::Ceased,
186            "dropped" => Status::Dropped,
187            s => return Err(anyhow::anyhow!("{} is not a valid status", s)),
188        })
189    }
190}
191
192impl Status {
193    /// Keep in sync with `Status::from_str`.
194    pub fn to_str(&self) -> &'static str {
195        match self {
196            Status::Starting => "starting",
197            Status::Running => "running",
198            Status::Paused => "paused",
199            Status::Stalled => "stalled",
200            Status::Ceased => "ceased",
201            Status::Dropped => "dropped",
202        }
203    }
204
205    /// Determines if a new status should be produced in context of a previous
206    /// status.
207    pub fn superseded_by(self, new: Status) -> bool {
208        match (self, new) {
209            (_, Status::Dropped) => true,
210            (Status::Dropped, _) => false,
211            // Don't re-mark that object as paused.
212            (Status::Paused, Status::Paused) => false,
213            // De-duplication of other statuses is currently managed by the
214            // `health_operator`.
215            _ => true,
216        }
217    }
218}
219
220/// A source or sink status update.
221///
222/// Represents a status update for a given object type. The inner value for each
223/// variant should be able to be packed into a status row that conforms to the schema
224/// for the object's status history relation.
225#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
226pub struct StatusUpdate {
227    pub id: GlobalId,
228    pub status: Status,
229    pub timestamp: chrono::DateTime<chrono::Utc>,
230    pub error: Option<String>,
231    pub hints: BTreeSet<String>,
232    pub namespaced_errors: BTreeMap<String, String>,
233    pub replica_id: Option<ReplicaId>,
234}
235
236impl StatusUpdate {
237    pub fn new(
238        id: GlobalId,
239        timestamp: chrono::DateTime<chrono::Utc>,
240        status: Status,
241    ) -> StatusUpdate {
242        StatusUpdate {
243            id,
244            timestamp,
245            status,
246            error: None,
247            hints: Default::default(),
248            namespaced_errors: Default::default(),
249            replica_id: None,
250        }
251    }
252}
253
254impl From<StatusUpdate> for Row {
255    fn from(update: StatusUpdate) -> Self {
256        use mz_repr::Datum;
257
258        let timestamp = Datum::TimestampTz(update.timestamp.try_into().expect("must fit"));
259        let id = update.id.to_string();
260        let id = Datum::String(&id);
261        let status = Datum::String(update.status.to_str());
262        let error = update.error.as_deref().into();
263
264        let mut row = Row::default();
265        let mut packer = row.packer();
266        packer.extend([timestamp, id, status, error]);
267
268        if !update.hints.is_empty() || !update.namespaced_errors.is_empty() {
269            packer.push_dict_with(|dict_packer| {
270                // `hint` and `namespaced` are ordered,
271                // as well as the BTree's they each contain.
272                if !update.hints.is_empty() {
273                    dict_packer.push(Datum::String("hints"));
274                    dict_packer.push_list(update.hints.iter().map(|s| Datum::String(s)));
275                }
276                if !update.namespaced_errors.is_empty() {
277                    dict_packer.push(Datum::String("namespaced"));
278                    dict_packer.push_dict(
279                        update
280                            .namespaced_errors
281                            .iter()
282                            .map(|(k, v)| (k.as_str(), Datum::String(v))),
283                    );
284                }
285            });
286        } else {
287            packer.push(Datum::Null);
288        }
289
290        match update.replica_id {
291            Some(id) => packer.push(Datum::String(&id.to_string())),
292            None => packer.push(Datum::Null),
293        }
294
295        row
296    }
297}
298
299/// An update to an append only collection.
300pub enum AppendOnlyUpdate {
301    Row((Row, Diff)),
302    Status(StatusUpdate),
303}
304
305impl AppendOnlyUpdate {
306    pub fn into_row(self) -> (Row, Diff) {
307        match self {
308            AppendOnlyUpdate::Row((row, diff)) => (row, diff),
309            AppendOnlyUpdate::Status(status) => (Row::from(status), Diff::ONE),
310        }
311    }
312}
313
314impl From<(Row, Diff)> for AppendOnlyUpdate {
315    fn from((row, diff): (Row, Diff)) -> Self {
316        Self::Row((row, diff))
317    }
318}
319
320impl From<StatusUpdate> for AppendOnlyUpdate {
321    fn from(update: StatusUpdate) -> Self {
322        Self::Status(update)
323    }
324}
325
326/// Responses that the storage nature of a worker/dataflow can provide back to the coordinator.
327#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
328pub enum StorageResponse<T = mz_repr::Timestamp> {
329    /// A new upper frontier for the specified identifier.
330    FrontierUpper(GlobalId, Antichain<T>),
331    /// Punctuation indicates that no more responses will be transmitted for the specified id
332    DroppedId(GlobalId),
333    /// Batches that have been staged in Persist and maybe will be linked into a shard.
334    StagedBatches(BTreeMap<uuid::Uuid, Vec<Result<ProtoBatch, String>>>),
335    /// A list of statistics updates, currently only for sources.
336    StatisticsUpdates(Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>),
337    /// A status update for a source or a sink. Periodically sent from
338    /// storage workers to convey the latest status information about an object.
339    StatusUpdate(StatusUpdate),
340}
341
342/// Maintained state for partitioned storage clients.
343///
344/// This helper type unifies the responses of multiple partitioned
345/// workers in order to present as a single worker.
346#[derive(Debug)]
347pub struct PartitionedStorageState<T> {
348    /// Number of partitions the state machine represents.
349    parts: usize,
350    /// Upper frontiers for sources and sinks, both unioned across all partitions and from each
351    /// individual partition.
352    uppers: BTreeMap<GlobalId, (MutableAntichain<T>, Vec<Option<Antichain<T>>>)>,
353    /// Staged batches from oneshot sources that will get appended by `environmentd`.
354    oneshot_source_responses:
355        BTreeMap<uuid::Uuid, BTreeMap<usize, Vec<Result<ProtoBatch, String>>>>,
356}
357
358impl<T> Partitionable<StorageCommand<T>, StorageResponse<T>>
359    for (StorageCommand<T>, StorageResponse<T>)
360where
361    T: timely::progress::Timestamp + Lattice,
362{
363    type PartitionedState = PartitionedStorageState<T>;
364
365    fn new(parts: usize) -> PartitionedStorageState<T> {
366        PartitionedStorageState {
367            parts,
368            uppers: BTreeMap::new(),
369            oneshot_source_responses: BTreeMap::new(),
370        }
371    }
372}
373
374impl<T> PartitionedStorageState<T>
375where
376    T: timely::progress::Timestamp,
377{
378    fn observe_command(&mut self, command: &StorageCommand<T>) {
379        // Note that `observe_command` is quite different in `mz_compute_client`.
380        // Compute (currently) only sends the command to 1 process,
381        // but storage fans out to all workers, allowing the storage processes
382        // to self-coordinate how commands and internal commands are ordered.
383        //
384        // TODO(guswynn): cluster-unification: consolidate this with compute.
385        let _ = match command {
386            StorageCommand::Hello { .. } => {}
387            StorageCommand::RunIngestion(ingestion) => {
388                self.insert_new_uppers(ingestion.description.collection_ids());
389            }
390            StorageCommand::RunSink(export) => {
391                self.insert_new_uppers([export.id]);
392            }
393            StorageCommand::InitializationComplete
394            | StorageCommand::AllowWrites
395            | StorageCommand::UpdateConfiguration(_)
396            | StorageCommand::AllowCompaction(_, _)
397            | StorageCommand::RunOneshotIngestion(_)
398            | StorageCommand::CancelOneshotIngestion { .. } => {}
399        };
400    }
401
402    /// Shared implementation for commands that install uppers with controllable behavior with
403    /// encountering existing uppers.
404    ///
405    /// If any ID was previously tracked in `self` and `skip_existing` is `false`, we return the ID
406    /// as an error.
407    fn insert_new_uppers<I: IntoIterator<Item = GlobalId>>(&mut self, ids: I) {
408        for id in ids {
409            self.uppers.entry(id).or_insert_with(|| {
410                let mut frontier = MutableAntichain::new();
411                // TODO(guswynn): cluster-unification: fix this dangerous use of `as`, by
412                // merging the types that compute and storage use.
413                #[allow(clippy::as_conversions)]
414                frontier.update_iter(iter::once((T::minimum(), self.parts as i64)));
415                let part_frontiers = vec![Some(Antichain::from_elem(T::minimum())); self.parts];
416
417                (frontier, part_frontiers)
418            });
419        }
420    }
421}
422
423impl<T> PartitionedState<StorageCommand<T>, StorageResponse<T>> for PartitionedStorageState<T>
424where
425    T: timely::progress::Timestamp + Lattice,
426{
427    fn split_command(&mut self, command: StorageCommand<T>) -> Vec<Option<StorageCommand<T>>> {
428        self.observe_command(&command);
429
430        // Fan out to all processes (which will fan out to all workers).
431        // StorageState manages ordering of commands internally.
432        vec![Some(command); self.parts]
433    }
434
435    fn absorb_response(
436        &mut self,
437        shard_id: usize,
438        response: StorageResponse<T>,
439    ) -> Option<Result<StorageResponse<T>, anyhow::Error>> {
440        match response {
441            // Avoid multiple retractions of minimum time, to present as updates from one worker.
442            StorageResponse::FrontierUpper(id, new_shard_upper) => {
443                let (frontier, shard_frontiers) = match self.uppers.get_mut(&id) {
444                    Some(value) => value,
445                    None => panic!("Reference to absent collection: {id}"),
446                };
447                let old_upper = frontier.frontier().to_owned();
448                let shard_upper = match &mut shard_frontiers[shard_id] {
449                    Some(shard_upper) => shard_upper,
450                    None => panic!("Reference to absent shard {shard_id} for collection {id}"),
451                };
452                frontier.update_iter(shard_upper.iter().map(|t| (t.clone(), -1)));
453                frontier.update_iter(new_shard_upper.iter().map(|t| (t.clone(), 1)));
454                shard_upper.join_assign(&new_shard_upper);
455
456                let new_upper = frontier.frontier();
457                if PartialOrder::less_than(&old_upper.borrow(), &new_upper) {
458                    Some(Ok(StorageResponse::FrontierUpper(id, new_upper.to_owned())))
459                } else {
460                    None
461                }
462            }
463            StorageResponse::DroppedId(id) => {
464                let (_, shard_frontiers) = match self.uppers.get_mut(&id) {
465                    Some(value) => value,
466                    None => panic!("Reference to absent collection: {id}"),
467                };
468                let prev = shard_frontiers[shard_id].take();
469                assert!(
470                    prev.is_some(),
471                    "got double drop for {id} from shard {shard_id}"
472                );
473
474                if shard_frontiers.iter().all(Option::is_none) {
475                    self.uppers.remove(&id);
476                    Some(Ok(StorageResponse::DroppedId(id)))
477                } else {
478                    None
479                }
480            }
481            StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
482                // Just forward it along; the `worker_id` should have been set in `storage_state`.
483                // We _could_ consolidate across worker_id's, here, but each worker only produces
484                // responses periodically, so we avoid that complexity.
485                Some(Ok(StorageResponse::StatisticsUpdates(
486                    source_stats,
487                    sink_stats,
488                )))
489            }
490            StorageResponse::StatusUpdate(updates) => {
491                Some(Ok(StorageResponse::StatusUpdate(updates)))
492            }
493            StorageResponse::StagedBatches(batches) => {
494                let mut finished_batches = BTreeMap::new();
495
496                for (collection_id, batches) in batches {
497                    tracing::info!(%shard_id, %collection_id, "got batch");
498
499                    let entry = self
500                        .oneshot_source_responses
501                        .entry(collection_id)
502                        .or_default();
503                    let novel = entry.insert(shard_id, batches);
504                    assert_none!(novel, "Duplicate oneshot source response");
505
506                    // Check if we've received responses from all shards.
507                    if entry.len() == self.parts {
508                        let entry = self
509                            .oneshot_source_responses
510                            .remove(&collection_id)
511                            .expect("checked above");
512                        let all_batches: Vec<_> = entry.into_values().flatten().collect();
513
514                        finished_batches.insert(collection_id, all_batches);
515                    }
516                }
517
518                if !finished_batches.is_empty() {
519                    Some(Ok(StorageResponse::StagedBatches(finished_batches)))
520                } else {
521                    None
522                }
523            }
524        }
525    }
526}
527
528#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
529/// A batch of updates to be fed to a local input
530pub struct Update<T = mz_repr::Timestamp> {
531    pub row: Row,
532    pub timestamp: T,
533    pub diff: Diff,
534}
535
536#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
537/// A batch of updates to be fed to a local input; however, the input must
538/// determine the most appropriate timestamps to use.
539///
540/// TODO(cf2): Can we remove this and use only on [`TableData`].
541pub struct TimestamplessUpdate {
542    pub row: Row,
543    pub diff: Diff,
544}
545
546#[derive(Debug, Clone, PartialEq)]
547pub enum TableData {
548    /// Rows that still need to be persisted and appended.
549    ///
550    /// The contained [`Row`]s are _not_ consolidated.
551    Rows(Vec<(Row, Diff)>),
552    /// Batches already staged in Persist ready to be appended.
553    Batches(SmallVec<[ProtoBatch; 1]>),
554}
555
556impl TableData {
557    pub fn is_empty(&self) -> bool {
558        match self {
559            TableData::Rows(rows) => rows.is_empty(),
560            TableData::Batches(batches) => batches.is_empty(),
561        }
562    }
563}
564
565/// A collection of timestamp-less updates. As updates are added to the builder
566/// they are automatically spilled to blob storage.
567pub struct TimestamplessUpdateBuilder<K, V, T, D>
568where
569    K: Codec,
570    V: Codec,
571    T: Timestamp + Lattice + Codec64,
572    D: Codec64,
573{
574    builder: BatchBuilder<K, V, T, D>,
575    initial_ts: T,
576}
577
578impl<K, V, T, D> TimestamplessUpdateBuilder<K, V, T, D>
579where
580    K: Debug + Codec,
581    V: Debug + Codec,
582    T: TimestampManipulation + Lattice + Codec64 + Sync,
583    D: Semigroup + Ord + Codec64 + Send + Sync,
584{
585    /// Create a new [`TimestamplessUpdateBuilder`] for the shard associated
586    /// with the provided [`WriteHandle`].
587    pub fn new(handle: &WriteHandle<K, V, T, D>) -> Self {
588        let initial_ts = T::minimum();
589        let builder = handle.builder(Antichain::from_elem(initial_ts.clone()));
590        TimestamplessUpdateBuilder {
591            builder,
592            initial_ts,
593        }
594    }
595
596    /// Add a `(K, V, D)` to the staged batch.
597    pub async fn add(&mut self, k: &K, v: &V, d: &D) {
598        self.builder
599            .add(k, v, &self.initial_ts, d)
600            .await
601            .expect("invalid Persist usage");
602    }
603
604    /// Finish the builder and return a [`ProtoBatch`] which can later be linked into a shard.
605    ///
606    /// The returned batch has nonsensical lower and upper bounds and must be re-written before
607    /// appending into the destination shard.
608    pub async fn finish(self) -> ProtoBatch {
609        let finish_ts = StepForward::step_forward(&self.initial_ts);
610        let batch = self
611            .builder
612            .finish(Antichain::from_elem(finish_ts))
613            .await
614            .expect("invalid Persist usage");
615
616        batch.into_transmittable_batch()
617    }
618}
619
620impl TryIntoProtocolNonce for StorageCommand {
621    fn try_into_protocol_nonce(self) -> Result<Uuid, Self> {
622        match self {
623            StorageCommand::Hello { nonce } => Ok(nonce),
624            cmd => Err(cmd),
625        }
626    }
627}
628
629#[cfg(test)]
630mod tests {
631    use super::*;
632
633    /// Test to ensure the size of the `StorageCommand` enum doesn't regress.
634    #[mz_ore::test]
635    fn test_storage_command_size() {
636        assert_eq!(std::mem::size_of::<StorageCommand>(), 40);
637    }
638
639    /// Test to ensure the size of the `StorageResponse` enum doesn't regress.
640    #[mz_ore::test]
641    fn test_storage_response_size() {
642        assert_eq!(std::mem::size_of::<StorageResponse>(), 120);
643    }
644}