mz_storage_client/
client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![allow(missing_docs)]
11
12//! The public API of the storage layer.
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt::Debug;
16use std::iter;
17
18use async_trait::async_trait;
19use differential_dataflow::difference::Semigroup;
20use differential_dataflow::lattice::Lattice;
21use mz_cluster_client::ReplicaId;
22use mz_cluster_client::client::TryIntoProtocolNonce;
23use mz_ore::assert_none;
24use mz_persist_client::batch::{BatchBuilder, ProtoBatch};
25use mz_persist_client::write::WriteHandle;
26use mz_persist_types::{Codec, Codec64, StepForward};
27use mz_repr::{Diff, GlobalId, Row, TimestampManipulation};
28use mz_service::client::{GenericClient, Partitionable, PartitionedState};
29use mz_storage_types::controller::CollectionMetadata;
30use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
31use mz_storage_types::parameters::StorageParameters;
32use mz_storage_types::sinks::StorageSinkDesc;
33use mz_storage_types::sources::IngestionDescription;
34use serde::{Deserialize, Serialize};
35use smallvec::SmallVec;
36use timely::PartialOrder;
37use timely::progress::Timestamp;
38use timely::progress::frontier::{Antichain, MutableAntichain};
39use uuid::Uuid;
40
41use crate::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate};
42
43/// A client to a storage server.
44pub trait StorageClient<T = mz_repr::Timestamp>:
45    GenericClient<StorageCommand<T>, StorageResponse<T>>
46{
47}
48
49impl<C, T> StorageClient<T> for C where C: GenericClient<StorageCommand<T>, StorageResponse<T>> {}
50
51#[async_trait]
52impl<T: Send> GenericClient<StorageCommand<T>, StorageResponse<T>> for Box<dyn StorageClient<T>> {
53    async fn send(&mut self, cmd: StorageCommand<T>) -> Result<(), anyhow::Error> {
54        (**self).send(cmd).await
55    }
56
57    /// # Cancel safety
58    ///
59    /// This method is cancel safe. If `recv` is used as the event in a [`tokio::select!`]
60    /// statement and some other branch completes first, it is guaranteed that no messages were
61    /// received by this client.
62    async fn recv(&mut self) -> Result<Option<StorageResponse<T>>, anyhow::Error> {
63        // `GenericClient::recv` is required to be cancel safe.
64        (**self).recv().await
65    }
66}
67
68/// Commands related to the ingress and egress of collections.
69#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
70pub enum StorageCommand<T = mz_repr::Timestamp> {
71    /// Transmits connection meta information, before other commands are sent.
72    Hello {
73        nonce: Uuid,
74    },
75    /// Indicates that the controller has sent all commands reflecting its
76    /// initial state.
77    InitializationComplete,
78    /// `AllowWrites` informs the replica that it can transition out of the
79    /// read-only stage and into the read-write computation stage.
80    /// It is now allowed to affect changes to external systems (writes).
81    ///
82    /// See `ComputeCommand::AllowWrites` for details. This command works
83    /// analogously to the compute version.
84    AllowWrites,
85    /// Update storage instance configuration.
86    UpdateConfiguration(Box<StorageParameters>),
87    /// Run the specified ingestion dataflow.
88    RunIngestion(Box<RunIngestionCommand>),
89    /// Enable compaction in storage-managed collections.
90    ///
91    /// A collection id and a frontier after which accumulations must be correct.
92    AllowCompaction(GlobalId, Antichain<T>),
93    RunSink(Box<RunSinkCommand<T>>),
94    /// Run a dataflow which will ingest data from an external source and only __stage__ it in
95    /// Persist.
96    ///
97    /// Unlike regular ingestions/sources, some other component (e.g. `environmentd`) is
98    /// responsible for linking the staged data into a shard.
99    RunOneshotIngestion(Box<RunOneshotIngestion>),
100    /// `CancelOneshotIngestion` instructs the replica to cancel the identified oneshot ingestion.
101    ///
102    /// It is invalid to send a [`CancelOneshotIngestion`] command that references a oneshot
103    /// ingestion that was not created by a corresponding [`RunOneshotIngestion`] command before.
104    /// Doing so may cause the replica to exhibit undefined behavior.
105    ///
106    /// [`CancelOneshotIngestion`]: crate::client::StorageCommand::CancelOneshotIngestion
107    /// [`RunOneshotIngestion`]: crate::client::StorageCommand::RunOneshotIngestion
108    CancelOneshotIngestion(Uuid),
109}
110
111impl<T> StorageCommand<T> {
112    /// Returns whether this command instructs the installation of storage objects.
113    pub fn installs_objects(&self) -> bool {
114        use StorageCommand::*;
115        match self {
116            Hello { .. }
117            | InitializationComplete
118            | AllowWrites
119            | UpdateConfiguration(_)
120            | AllowCompaction(_, _)
121            | CancelOneshotIngestion { .. } => false,
122            // TODO(cf2): multi-replica oneshot ingestions. At the moment returning
123            // true here means we can't run `COPY FROM` on multi-replica clusters, this
124            // should be easy enough to support though.
125            RunIngestion(_) | RunSink(_) | RunOneshotIngestion(_) => true,
126        }
127    }
128}
129
130/// A command that starts ingesting the given ingestion description
131#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
132pub struct RunIngestionCommand {
133    /// The id of the storage collection being ingested.
134    pub id: GlobalId,
135    /// The description of what source type should be ingested and what post-processing steps must
136    /// be applied to the data before writing them down into the storage collection
137    pub description: IngestionDescription<CollectionMetadata>,
138}
139
140/// A command that starts ingesting the given ingestion description
141#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
142pub struct RunOneshotIngestion {
143    /// The ID of the ingestion dataflow.
144    pub ingestion_id: uuid::Uuid,
145    /// The ID of collection we'll stage batches for.
146    pub collection_id: GlobalId,
147    /// Metadata for the collection we'll stage batches for.
148    pub collection_meta: CollectionMetadata,
149    /// Details for the oneshot ingestion.
150    pub request: OneshotIngestionRequest,
151}
152
153/// A command that starts exporting the given sink description
154#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
155pub struct RunSinkCommand<T> {
156    pub id: GlobalId,
157    pub description: StorageSinkDesc<CollectionMetadata, T>,
158}
159
160/// A "kind" enum for statuses tracked by the health operator
161#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
162pub enum Status {
163    Starting,
164    Running,
165    Paused,
166    Stalled,
167    /// This status is currently unused.
168    // re-design the ceased status
169    Ceased,
170    Dropped,
171}
172
173impl std::str::FromStr for Status {
174    type Err = anyhow::Error;
175    /// Keep in sync with [`Status::to_str`].
176    fn from_str(s: &str) -> Result<Self, Self::Err> {
177        Ok(match s {
178            "starting" => Status::Starting,
179            "running" => Status::Running,
180            "paused" => Status::Paused,
181            "stalled" => Status::Stalled,
182            "ceased" => Status::Ceased,
183            "dropped" => Status::Dropped,
184            s => return Err(anyhow::anyhow!("{} is not a valid status", s)),
185        })
186    }
187}
188
189impl Status {
190    /// Keep in sync with `Status::from_str`.
191    pub fn to_str(&self) -> &'static str {
192        match self {
193            Status::Starting => "starting",
194            Status::Running => "running",
195            Status::Paused => "paused",
196            Status::Stalled => "stalled",
197            Status::Ceased => "ceased",
198            Status::Dropped => "dropped",
199        }
200    }
201
202    /// Determines if a new status should be produced in context of a previous
203    /// status.
204    pub fn superseded_by(self, new: Status) -> bool {
205        match (self, new) {
206            (_, Status::Dropped) => true,
207            (Status::Dropped, _) => false,
208            // Don't re-mark that object as paused.
209            (Status::Paused, Status::Paused) => false,
210            // De-duplication of other statuses is currently managed by the
211            // `health_operator`.
212            _ => true,
213        }
214    }
215}
216
217/// A source or sink status update.
218///
219/// Represents a status update for a given object type. The inner value for each
220/// variant should be able to be packed into a status row that conforms to the schema
221/// for the object's status history relation.
222#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
223pub struct StatusUpdate {
224    pub id: GlobalId,
225    pub status: Status,
226    pub timestamp: chrono::DateTime<chrono::Utc>,
227    pub error: Option<String>,
228    pub hints: BTreeSet<String>,
229    pub namespaced_errors: BTreeMap<String, String>,
230    pub replica_id: Option<ReplicaId>,
231}
232
233impl StatusUpdate {
234    pub fn new(
235        id: GlobalId,
236        timestamp: chrono::DateTime<chrono::Utc>,
237        status: Status,
238    ) -> StatusUpdate {
239        StatusUpdate {
240            id,
241            timestamp,
242            status,
243            error: None,
244            hints: Default::default(),
245            namespaced_errors: Default::default(),
246            replica_id: None,
247        }
248    }
249}
250
251impl From<StatusUpdate> for Row {
252    fn from(update: StatusUpdate) -> Self {
253        use mz_repr::Datum;
254
255        let timestamp = Datum::TimestampTz(update.timestamp.try_into().expect("must fit"));
256        let id = update.id.to_string();
257        let id = Datum::String(&id);
258        let status = Datum::String(update.status.to_str());
259        let error = update.error.as_deref().into();
260
261        let mut row = Row::default();
262        let mut packer = row.packer();
263        packer.extend([timestamp, id, status, error]);
264
265        if !update.hints.is_empty() || !update.namespaced_errors.is_empty() {
266            packer.push_dict_with(|dict_packer| {
267                // `hint` and `namespaced` are ordered,
268                // as well as the BTree's they each contain.
269                if !update.hints.is_empty() {
270                    dict_packer.push(Datum::String("hints"));
271                    dict_packer.push_list(update.hints.iter().map(|s| Datum::String(s)));
272                }
273                if !update.namespaced_errors.is_empty() {
274                    dict_packer.push(Datum::String("namespaced"));
275                    dict_packer.push_dict(
276                        update
277                            .namespaced_errors
278                            .iter()
279                            .map(|(k, v)| (k.as_str(), Datum::String(v))),
280                    );
281                }
282            });
283        } else {
284            packer.push(Datum::Null);
285        }
286
287        match update.replica_id {
288            Some(id) => packer.push(Datum::String(&id.to_string())),
289            None => packer.push(Datum::Null),
290        }
291
292        row
293    }
294}
295
296/// An update to an append only collection.
297pub enum AppendOnlyUpdate {
298    Row((Row, Diff)),
299    Status(StatusUpdate),
300}
301
302impl AppendOnlyUpdate {
303    pub fn into_row(self) -> (Row, Diff) {
304        match self {
305            AppendOnlyUpdate::Row((row, diff)) => (row, diff),
306            AppendOnlyUpdate::Status(status) => (Row::from(status), Diff::ONE),
307        }
308    }
309}
310
311impl From<(Row, Diff)> for AppendOnlyUpdate {
312    fn from((row, diff): (Row, Diff)) -> Self {
313        Self::Row((row, diff))
314    }
315}
316
317impl From<StatusUpdate> for AppendOnlyUpdate {
318    fn from(update: StatusUpdate) -> Self {
319        Self::Status(update)
320    }
321}
322
323/// Responses that the storage nature of a worker/dataflow can provide back to the coordinator.
324#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
325pub enum StorageResponse<T = mz_repr::Timestamp> {
326    /// A new upper frontier for the specified identifier.
327    FrontierUpper(GlobalId, Antichain<T>),
328    /// Punctuation indicates that no more responses will be transmitted for the specified id
329    DroppedId(GlobalId),
330    /// Batches that have been staged in Persist and maybe will be linked into a shard.
331    StagedBatches(BTreeMap<uuid::Uuid, Vec<Result<ProtoBatch, String>>>),
332    /// A list of statistics updates, currently only for sources.
333    StatisticsUpdates(Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>),
334    /// A status update for a source or a sink. Periodically sent from
335    /// storage workers to convey the latest status information about an object.
336    StatusUpdate(StatusUpdate),
337}
338
339/// Maintained state for partitioned storage clients.
340///
341/// This helper type unifies the responses of multiple partitioned
342/// workers in order to present as a single worker.
343#[derive(Debug)]
344pub struct PartitionedStorageState<T> {
345    /// Number of partitions the state machine represents.
346    parts: usize,
347    /// Upper frontiers for sources and sinks, both unioned across all partitions and from each
348    /// individual partition.
349    uppers: BTreeMap<GlobalId, (MutableAntichain<T>, Vec<Option<Antichain<T>>>)>,
350    /// Staged batches from oneshot sources that will get appended by `environmentd`.
351    oneshot_source_responses:
352        BTreeMap<uuid::Uuid, BTreeMap<usize, Vec<Result<ProtoBatch, String>>>>,
353}
354
355impl<T> Partitionable<StorageCommand<T>, StorageResponse<T>>
356    for (StorageCommand<T>, StorageResponse<T>)
357where
358    T: timely::progress::Timestamp + Lattice,
359{
360    type PartitionedState = PartitionedStorageState<T>;
361
362    fn new(parts: usize) -> PartitionedStorageState<T> {
363        PartitionedStorageState {
364            parts,
365            uppers: BTreeMap::new(),
366            oneshot_source_responses: BTreeMap::new(),
367        }
368    }
369}
370
371impl<T> PartitionedStorageState<T>
372where
373    T: timely::progress::Timestamp,
374{
375    fn observe_command(&mut self, command: &StorageCommand<T>) {
376        // Note that `observe_command` is quite different in `mz_compute_client`.
377        // Compute (currently) only sends the command to 1 process,
378        // but storage fans out to all workers, allowing the storage processes
379        // to self-coordinate how commands and internal commands are ordered.
380        //
381        // TODO(guswynn): cluster-unification: consolidate this with compute.
382        let _ = match command {
383            StorageCommand::Hello { .. } => {}
384            StorageCommand::RunIngestion(ingestion) => {
385                self.insert_new_uppers(ingestion.description.collection_ids());
386            }
387            StorageCommand::RunSink(export) => {
388                self.insert_new_uppers([export.id]);
389            }
390            StorageCommand::InitializationComplete
391            | StorageCommand::AllowWrites
392            | StorageCommand::UpdateConfiguration(_)
393            | StorageCommand::AllowCompaction(_, _)
394            | StorageCommand::RunOneshotIngestion(_)
395            | StorageCommand::CancelOneshotIngestion { .. } => {}
396        };
397    }
398
399    /// Shared implementation for commands that install uppers with controllable behavior with
400    /// encountering existing uppers.
401    ///
402    /// If any ID was previously tracked in `self` and `skip_existing` is `false`, we return the ID
403    /// as an error.
404    fn insert_new_uppers<I: IntoIterator<Item = GlobalId>>(&mut self, ids: I) {
405        for id in ids {
406            self.uppers.entry(id).or_insert_with(|| {
407                let mut frontier = MutableAntichain::new();
408                // TODO(guswynn): cluster-unification: fix this dangerous use of `as`, by
409                // merging the types that compute and storage use.
410                #[allow(clippy::as_conversions)]
411                frontier.update_iter(iter::once((T::minimum(), self.parts as i64)));
412                let part_frontiers = vec![Some(Antichain::from_elem(T::minimum())); self.parts];
413
414                (frontier, part_frontiers)
415            });
416        }
417    }
418}
419
420impl<T> PartitionedState<StorageCommand<T>, StorageResponse<T>> for PartitionedStorageState<T>
421where
422    T: timely::progress::Timestamp + Lattice,
423{
424    fn split_command(&mut self, command: StorageCommand<T>) -> Vec<Option<StorageCommand<T>>> {
425        self.observe_command(&command);
426
427        // Fan out to all processes (which will fan out to all workers).
428        // StorageState manages ordering of commands internally.
429        vec![Some(command); self.parts]
430    }
431
432    fn absorb_response(
433        &mut self,
434        shard_id: usize,
435        response: StorageResponse<T>,
436    ) -> Option<Result<StorageResponse<T>, anyhow::Error>> {
437        match response {
438            // Avoid multiple retractions of minimum time, to present as updates from one worker.
439            StorageResponse::FrontierUpper(id, new_shard_upper) => {
440                let (frontier, shard_frontiers) = match self.uppers.get_mut(&id) {
441                    Some(value) => value,
442                    None => panic!("Reference to absent collection: {id}"),
443                };
444                let old_upper = frontier.frontier().to_owned();
445                let shard_upper = match &mut shard_frontiers[shard_id] {
446                    Some(shard_upper) => shard_upper,
447                    None => panic!("Reference to absent shard {shard_id} for collection {id}"),
448                };
449                frontier.update_iter(shard_upper.iter().map(|t| (t.clone(), -1)));
450                frontier.update_iter(new_shard_upper.iter().map(|t| (t.clone(), 1)));
451                shard_upper.join_assign(&new_shard_upper);
452
453                let new_upper = frontier.frontier();
454                if PartialOrder::less_than(&old_upper.borrow(), &new_upper) {
455                    Some(Ok(StorageResponse::FrontierUpper(id, new_upper.to_owned())))
456                } else {
457                    None
458                }
459            }
460            StorageResponse::DroppedId(id) => {
461                let (_, shard_frontiers) = match self.uppers.get_mut(&id) {
462                    Some(value) => value,
463                    None => panic!("Reference to absent collection: {id}"),
464                };
465                let prev = shard_frontiers[shard_id].take();
466                assert!(
467                    prev.is_some(),
468                    "got double drop for {id} from shard {shard_id}"
469                );
470
471                if shard_frontiers.iter().all(Option::is_none) {
472                    self.uppers.remove(&id);
473                    Some(Ok(StorageResponse::DroppedId(id)))
474                } else {
475                    None
476                }
477            }
478            StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
479                // Just forward it along; the `worker_id` should have been set in `storage_state`.
480                // We _could_ consolidate across worker_id's, here, but each worker only produces
481                // responses periodically, so we avoid that complexity.
482                Some(Ok(StorageResponse::StatisticsUpdates(
483                    source_stats,
484                    sink_stats,
485                )))
486            }
487            StorageResponse::StatusUpdate(updates) => {
488                Some(Ok(StorageResponse::StatusUpdate(updates)))
489            }
490            StorageResponse::StagedBatches(batches) => {
491                let mut finished_batches = BTreeMap::new();
492
493                for (collection_id, batches) in batches {
494                    tracing::info!(%shard_id, %collection_id, "got batch");
495
496                    let entry = self
497                        .oneshot_source_responses
498                        .entry(collection_id)
499                        .or_default();
500                    let novel = entry.insert(shard_id, batches);
501                    assert_none!(novel, "Duplicate oneshot source response");
502
503                    // Check if we've received responses from all shards.
504                    if entry.len() == self.parts {
505                        let entry = self
506                            .oneshot_source_responses
507                            .remove(&collection_id)
508                            .expect("checked above");
509                        let all_batches: Vec<_> = entry.into_values().flatten().collect();
510
511                        finished_batches.insert(collection_id, all_batches);
512                    }
513                }
514
515                if !finished_batches.is_empty() {
516                    Some(Ok(StorageResponse::StagedBatches(finished_batches)))
517                } else {
518                    None
519                }
520            }
521        }
522    }
523}
524
525#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
526/// A batch of updates to be fed to a local input
527pub struct Update<T = mz_repr::Timestamp> {
528    pub row: Row,
529    pub timestamp: T,
530    pub diff: Diff,
531}
532
533#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
534/// A batch of updates to be fed to a local input; however, the input must
535/// determine the most appropriate timestamps to use.
536///
537/// TODO(cf2): Can we remove this and use only on [`TableData`].
538pub struct TimestamplessUpdate {
539    pub row: Row,
540    pub diff: Diff,
541}
542
543#[derive(Debug, Clone, PartialEq)]
544pub enum TableData {
545    /// Rows that still need to be persisted and appended.
546    ///
547    /// The contained [`Row`]s are _not_ consolidated.
548    Rows(Vec<(Row, Diff)>),
549    /// Batches already staged in Persist ready to be appended.
550    Batches(SmallVec<[ProtoBatch; 1]>),
551}
552
553impl TableData {
554    pub fn is_empty(&self) -> bool {
555        match self {
556            TableData::Rows(rows) => rows.is_empty(),
557            TableData::Batches(batches) => batches.is_empty(),
558        }
559    }
560}
561
562/// A collection of timestamp-less updates. As updates are added to the builder
563/// they are automatically spilled to blob storage.
564pub struct TimestamplessUpdateBuilder<K, V, T, D>
565where
566    K: Codec,
567    V: Codec,
568    T: Timestamp + Lattice + Codec64,
569    D: Codec64,
570{
571    builder: BatchBuilder<K, V, T, D>,
572    initial_ts: T,
573}
574
575impl<K, V, T, D> TimestamplessUpdateBuilder<K, V, T, D>
576where
577    K: Debug + Codec,
578    V: Debug + Codec,
579    T: TimestampManipulation + Lattice + Codec64 + Sync,
580    D: Semigroup + Ord + Codec64 + Send + Sync,
581{
582    /// Create a new [`TimestamplessUpdateBuilder`] for the shard associated
583    /// with the provided [`WriteHandle`].
584    pub fn new(handle: &WriteHandle<K, V, T, D>) -> Self {
585        let initial_ts = T::minimum();
586        let builder = handle.builder(Antichain::from_elem(initial_ts.clone()));
587        TimestamplessUpdateBuilder {
588            builder,
589            initial_ts,
590        }
591    }
592
593    /// Add a `(K, V, D)` to the staged batch.
594    pub async fn add(&mut self, k: &K, v: &V, d: &D) {
595        self.builder
596            .add(k, v, &self.initial_ts, d)
597            .await
598            .expect("invalid Persist usage");
599    }
600
601    /// Finish the builder and return a [`ProtoBatch`] which can later be linked into a shard.
602    ///
603    /// The returned batch has nonsensical lower and upper bounds and must be re-written before
604    /// appending into the destination shard.
605    pub async fn finish(self) -> ProtoBatch {
606        let finish_ts = StepForward::step_forward(&self.initial_ts);
607        let batch = self
608            .builder
609            .finish(Antichain::from_elem(finish_ts))
610            .await
611            .expect("invalid Persist usage");
612
613        batch.into_transmittable_batch()
614    }
615}
616
617impl TryIntoProtocolNonce for StorageCommand {
618    fn try_into_protocol_nonce(self) -> Result<Uuid, Self> {
619        match self {
620            StorageCommand::Hello { nonce } => Ok(nonce),
621            cmd => Err(cmd),
622        }
623    }
624}
625
626#[cfg(test)]
627mod tests {
628    use super::*;
629
630    /// Test to ensure the size of the `StorageCommand` enum doesn't regress.
631    #[mz_ore::test]
632    fn test_storage_command_size() {
633        assert_eq!(std::mem::size_of::<StorageCommand>(), 40);
634    }
635
636    /// Test to ensure the size of the `StorageResponse` enum doesn't regress.
637    #[mz_ore::test]
638    fn test_storage_response_size() {
639        assert_eq!(std::mem::size_of::<StorageResponse>(), 120);
640    }
641}