Skip to main content

mz_storage_client/
controller.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller that provides an interface to the storage layer.
11//!
12//! The storage controller curates the creation of sources, the progress of readers through these collections,
13//! and their eventual dropping and resource reclamation.
14//!
15//! The storage controller can be viewed as a partial map from `GlobalId` to collection. It is an error to
16//! use an identifier before it has been "created" with `create_source()`. Once created, the controller holds
17//! a read capability for each source, which is manipulated with `update_read_capabilities()`.
18//! Eventually, the source is dropped with either `drop_sources()` or by allowing compaction to the
19//! empty frontier.
20
21use std::collections::{BTreeMap, BTreeSet};
22use std::fmt::Debug;
23use std::future::Future;
24use std::num::NonZeroI64;
25use std::pin::Pin;
26use std::sync::Arc;
27use std::time::Duration;
28
29use async_trait::async_trait;
30use chrono::{DateTime, Utc};
31use differential_dataflow::lattice::Lattice;
32use mz_cluster_client::ReplicaId;
33use mz_cluster_client::client::ClusterReplicaLocation;
34use mz_controller_types::dyncfgs::WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL;
35use mz_dyncfg::ConfigSet;
36use mz_ore::soft_panic_or_log;
37use mz_persist_client::batch::ProtoBatch;
38use mz_persist_types::{Codec64, ShardId};
39use mz_repr::adt::interval::Interval;
40use mz_repr::adt::timestamp::CheckedTimestamp;
41use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, Timestamp};
42use mz_storage_types::configuration::StorageConfiguration;
43use mz_storage_types::connections::inline::InlinedConnection;
44use mz_storage_types::controller::{CollectionMetadata, StorageError};
45use mz_storage_types::errors::CollectionMissing;
46use mz_storage_types::instances::StorageInstanceId;
47use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
48use mz_storage_types::parameters::StorageParameters;
49use mz_storage_types::read_holds::ReadHold;
50use mz_storage_types::read_policy::ReadPolicy;
51use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
52use mz_storage_types::sources::{
53    GenericSourceConnection, IngestionDescription, SourceDesc, SourceExportDataConfig,
54    SourceExportDetails, Timeline,
55};
56use serde::{Deserialize, Serialize};
57use timely::progress::Antichain;
58use timely::progress::frontier::MutableAntichain;
59use tokio::sync::{mpsc, oneshot};
60
61use crate::client::{AppendOnlyUpdate, StatusUpdate, TableData};
62use crate::statistics::WebhookStatistics;
63
64#[derive(
65    Clone,
66    Copy,
67    Debug,
68    Serialize,
69    Deserialize,
70    Eq,
71    PartialEq,
72    Hash,
73    PartialOrd,
74    Ord
75)]
76pub enum IntrospectionType {
77    /// We're not responsible for appending to this collection automatically, but we should
78    /// automatically bump the write frontier from time to time.
79    SinkStatusHistory,
80    SourceStatusHistory,
81    ShardMapping,
82
83    Frontiers,
84    ReplicaFrontiers,
85
86    ReplicaStatusHistory,
87    ReplicaMetricsHistory,
88    WallclockLagHistory,
89    WallclockLagHistogram,
90
91    // Note that this single-shard introspection source will be changed to per-replica,
92    // once we allow multiplexing multiple sources/sinks on a single cluster.
93    StorageSourceStatistics,
94    StorageSinkStatistics,
95
96    // The below are for statement logging.
97    StatementExecutionHistory,
98    SessionHistory,
99    PreparedStatementHistory,
100    SqlText,
101    // For statement lifecycle logging, which is closely related
102    // to statement logging
103    StatementLifecycleHistory,
104
105    // Collections written by the compute controller.
106    ComputeDependencies,
107    ComputeOperatorHydrationStatus,
108    ComputeMaterializedViewRefreshes,
109    ComputeErrorCounts,
110    ComputeHydrationTimes,
111
112    // Written by the Adapter for tracking AWS PrivateLink Connection Status History
113    PrivatelinkConnectionStatusHistory,
114}
115
116/// Describes how data is written to the collection.
117#[derive(Clone, Debug, Eq, PartialEq)]
118pub enum DataSource {
119    /// Ingest data from some external source.
120    Ingestion(IngestionDescription),
121    /// This source receives its data from the identified ingestion,
122    /// from an external object identified using `SourceExportDetails`.
123    ///
124    /// The referenced ingestion must be created before all of its exports.
125    IngestionExport {
126        ingestion_id: GlobalId,
127        details: SourceExportDetails,
128        data_config: SourceExportDataConfig,
129    },
130    /// Data comes from introspection sources, which the controller itself is
131    /// responsible for generating.
132    Introspection(IntrospectionType),
133    /// Data comes from the source's remapping/reclock operator.
134    Progress,
135    /// Data comes from external HTTP requests pushed to Materialize.
136    Webhook,
137    /// The adapter layer appends timestamped data, i.e. it is a `TABLE`.
138    Table,
139    /// This source's data does not need to be managed by the storage
140    /// controller, e.g. it's a materialized view or the catalog collection.
141    Other,
142    /// This collection is the output collection of a sink.
143    Sink { desc: ExportDescription },
144}
145
146/// Describes a request to create a source.
147#[derive(Clone, Debug, Eq, PartialEq)]
148pub struct CollectionDescription {
149    /// The schema of this collection
150    pub desc: RelationDesc,
151    /// The source of this collection's data.
152    pub data_source: DataSource,
153    /// An optional frontier to which the collection's `since` should be advanced.
154    pub since: Option<Antichain<Timestamp>>,
155    /// The timeline of the source. Absent for materialized views, continual tasks, etc.
156    pub timeline: Option<Timeline>,
157    /// The primary of this collections.
158    ///
159    /// Multiple storage collections can point to the same persist shard,
160    /// possibly with different schemas. In such a configuration, we select one
161    /// of the involved collections as the primary, who "owns" the persist
162    /// shard. All other involved collections have a dependency on the primary.
163    pub primary: Option<GlobalId>,
164}
165
166impl CollectionDescription {
167    /// Create a CollectionDescription for [`DataSource::Other`].
168    pub fn for_other(desc: RelationDesc, since: Option<Antichain<Timestamp>>) -> Self {
169        Self {
170            desc,
171            data_source: DataSource::Other,
172            since,
173            timeline: None,
174            primary: None,
175        }
176    }
177
178    /// Create a CollectionDescription for a table.
179    pub fn for_table(desc: RelationDesc) -> Self {
180        Self {
181            desc,
182            data_source: DataSource::Table,
183            since: None,
184            timeline: Some(Timeline::EpochMilliseconds),
185            primary: None,
186        }
187    }
188}
189
190#[derive(Clone, Debug, Eq, PartialEq)]
191pub struct ExportDescription<T = mz_repr::Timestamp> {
192    pub sink: StorageSinkDesc<(), T>,
193    /// The ID of the instance in which to install the export.
194    pub instance_id: StorageInstanceId,
195}
196
197#[derive(Debug)]
198pub enum Response {
199    FrontierUpdates(Vec<(GlobalId, Antichain<Timestamp>)>),
200}
201
202/// Metadata that the storage controller must know to properly handle the life
203/// cycle of creating and dropping collections.
204///
205/// This data should be kept consistent with the state modified using
206/// [`StorageTxn`].
207///
208/// n.b. the "txn WAL shard" is also metadata that's persisted, but if we
209/// included it in this struct it would never be read.
210#[derive(Debug, Clone, Serialize, Default)]
211pub struct StorageMetadata {
212    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
213    pub collection_metadata: BTreeMap<GlobalId, ShardId>,
214    pub unfinalized_shards: BTreeSet<ShardId>,
215}
216
217impl StorageMetadata {
218    pub fn get_collection_shard(&self, id: GlobalId) -> Result<ShardId, StorageError> {
219        let shard_id = self
220            .collection_metadata
221            .get(&id)
222            .ok_or(StorageError::IdentifierMissing(id))?;
223
224        Ok(*shard_id)
225    }
226}
227
228/// Provides an interface for the storage controller to read and write data that
229/// is recorded elsewhere.
230///
231/// Data written to the implementor of this trait should make a consistent view
232/// of the data available through [`StorageMetadata`].
233#[async_trait]
234pub trait StorageTxn {
235    /// Retrieve all of the visible storage metadata.
236    ///
237    /// The value of this map should be treated as opaque.
238    fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId>;
239
240    /// Add new storage metadata for a collection.
241    ///
242    /// Subsequent calls to [`StorageTxn::get_collection_metadata`] must include
243    /// this data.
244    fn insert_collection_metadata(
245        &mut self,
246        s: BTreeMap<GlobalId, ShardId>,
247    ) -> Result<(), StorageError>;
248
249    /// Remove the metadata associated with the identified collections.
250    ///
251    /// Subsequent calls to [`StorageTxn::get_collection_metadata`] must not
252    /// include these keys.
253    fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)>;
254
255    /// Retrieve all of the shards that are no longer in use by an active
256    /// collection but are yet to be finalized.
257    fn get_unfinalized_shards(&self) -> BTreeSet<ShardId>;
258
259    /// Insert the specified values as unfinalized shards.
260    fn insert_unfinalized_shards(&mut self, s: BTreeSet<ShardId>) -> Result<(), StorageError>;
261
262    /// Mark the specified shards as finalized, deleting them from the
263    /// unfinalized shard collection.
264    fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>);
265
266    /// Get the txn WAL shard for this environment if it exists.
267    fn get_txn_wal_shard(&self) -> Option<ShardId>;
268
269    /// Store the specified shard as the environment's txn WAL shard.
270    ///
271    /// The implementor should error if the shard is already specified.
272    fn write_txn_wal_shard(&mut self, shard: ShardId) -> Result<(), StorageError>;
273}
274
275pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
276
277/// A predicate for a `Row` filter.
278pub type RowPredicate = Box<dyn Fn(&Row) -> bool + Send + Sync>;
279
280/// High-level write operations applicable to storage collections.
281pub enum StorageWriteOp {
282    /// Append a set of rows with specified multiplicities.
283    ///
284    /// The multiplicities may be negative, so an `Append` operation can perform
285    /// both insertions and retractions.
286    Append { updates: Vec<(Row, Diff)> },
287    /// Delete all rows matching the given predicate.
288    Delete { filter: RowPredicate },
289}
290
291impl StorageWriteOp {
292    /// Returns whether this operation appends an empty set of updates.
293    pub fn is_empty_append(&self) -> bool {
294        match self {
295            Self::Append { updates } => updates.is_empty(),
296            Self::Delete { .. } => false,
297        }
298    }
299}
300
301#[async_trait(?Send)]
302pub trait StorageController: Debug {
303    /// Marks the end of any initialization commands.
304    ///
305    /// The implementor may wait for this method to be called before implementing prior commands,
306    /// and so it is important for a user to invoke this method as soon as it is comfortable.
307    /// This method can be invoked immediately, at the potential expense of performance.
308    fn initialization_complete(&mut self);
309
310    /// Update storage configuration with new parameters.
311    fn update_parameters(&mut self, config_params: StorageParameters);
312
313    /// Get the current configuration, including parameters updated with `update_parameters`.
314    fn config(&self) -> &StorageConfiguration;
315
316    /// Returns the [CollectionMetadata] of the collection identified by `id`.
317    fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing>;
318
319    /// Returns `true` iff the given collection/ingestion has been hydrated.
320    ///
321    /// For this check, zero-replica clusters are always considered hydrated.
322    /// Their collections would never normally be considered hydrated but it's
323    /// clearly intentional that they have no replicas.
324    fn collection_hydrated(&self, collection_id: GlobalId) -> Result<bool, StorageError>;
325
326    /// Returns `true` if each non-transient, non-excluded collection is
327    /// hydrated on at least one of the provided replicas.
328    ///
329    /// If no replicas are provided, this checks for hydration on _any_ replica.
330    ///
331    /// This also returns `true` in case this cluster does not have any
332    /// replicas.
333    fn collections_hydrated_on_replicas(
334        &self,
335        target_replica_ids: Option<Vec<ReplicaId>>,
336        target_cluster_ids: &StorageInstanceId,
337        exclude_collections: &BTreeSet<GlobalId>,
338    ) -> Result<bool, StorageError>;
339
340    /// Returns the since/upper frontiers of the identified collection.
341    fn collection_frontiers(
342        &self,
343        id: GlobalId,
344    ) -> Result<(Antichain<Timestamp>, Antichain<Timestamp>), CollectionMissing>;
345
346    /// Returns the since/upper frontiers of the identified collections.
347    ///
348    /// Having a method that returns both frontiers at the same time, for all
349    /// requested collections, ensures that we can get a consistent "snapshot"
350    /// of collection state. If we had separate methods instead, and/or would
351    /// allow getting frontiers for collections one at a time, it could happen
352    /// that collection state changes concurrently, while information is
353    /// gathered.
354    fn collections_frontiers(
355        &self,
356        id: Vec<GlobalId>,
357    ) -> Result<Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>, CollectionMissing>;
358
359    /// Acquire an iterator over [CollectionMetadata] for all active
360    /// collections.
361    ///
362    /// A collection is "active" when it has a non-empty frontier of read
363    /// capabilities.
364    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
365
366    /// Returns the IDs of ingestion exports running on the given instance. This
367    /// includes the ingestion itself, if any, and running source tables (aka.
368    /// subsources).
369    fn active_ingestion_exports(
370        &self,
371        instance_id: StorageInstanceId,
372    ) -> Box<dyn Iterator<Item = &GlobalId> + '_>;
373
374    /// Checks whether a collection exists under the given `GlobalId`. Returns
375    /// an error if the collection does not exist.
376    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError>;
377
378    /// Creates a storage instance with the specified ID.
379    ///
380    /// A storage instance can have zero or one replicas. The instance is
381    /// created with zero replicas.
382    ///
383    /// Panics if a storage instance with the given ID already exists.
384    fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>);
385
386    /// Drops the storage instance with the given ID.
387    ///
388    /// If you call this method while the storage instance has a replica
389    /// attached, that replica will be leaked. Call `drop_replica` first.
390    ///
391    /// Panics if a storage instance with the given ID does not exist.
392    fn drop_instance(&mut self, id: StorageInstanceId);
393
394    /// Updates a storage instance's workload class.
395    fn update_instance_workload_class(
396        &mut self,
397        id: StorageInstanceId,
398        workload_class: Option<String>,
399    );
400
401    /// Connects the storage instance to the specified replica.
402    ///
403    /// If the storage instance is already attached to a replica, communication
404    /// with that replica is severed in favor of the new replica.
405    ///
406    /// In the future, this API will be adjusted to support active replication
407    /// of storage instances (i.e., multiple replicas attached to a given
408    /// storage instance).
409    fn connect_replica(
410        &mut self,
411        instance_id: StorageInstanceId,
412        replica_id: ReplicaId,
413        location: ClusterReplicaLocation,
414    );
415
416    /// Disconnects the storage instance from the specified replica.
417    fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId);
418
419    /// Across versions of Materialize the nullability of columns for some objects can change based
420    /// on updates to our optimizer.
421    ///
422    /// During bootstrap we will register these new schemas with Persist.
423    ///
424    /// See: <https://github.com/MaterializeInc/database-issues/issues/2488>
425    async fn evolve_nullability_for_bootstrap(
426        &mut self,
427        storage_metadata: &StorageMetadata,
428        collections: Vec<(GlobalId, RelationDesc)>,
429    ) -> Result<(), StorageError>;
430
431    /// Create the sources described in the individual RunIngestionCommand commands.
432    ///
433    /// Each command carries the source id, the source description, and any associated metadata
434    /// needed to ingest the particular source.
435    ///
436    /// This command installs collection state for the indicated sources, and they are
437    /// now valid to use in queries at times beyond the initial `since` frontiers. Each
438    /// collection also acquires a read capability at this frontier, which will need to
439    /// be repeatedly downgraded with `allow_compaction()` to permit compaction.
440    ///
441    /// This method is NOT idempotent; It can fail between processing of different
442    /// collections and leave the controller in an inconsistent state. It is almost
443    /// always wrong to do anything but abort the process on `Err`.
444    ///
445    /// The `register_ts` is used as the initial timestamp that tables are available for reads. (We
446    /// might later give non-tables the same treatment, but hold off on that initially.) Callers
447    /// must provide a Some if any of the collections is a table. A None may be given if none of the
448    /// collections are a table (i.e. all materialized views, sources, etc).
449    async fn create_collections(
450        &mut self,
451        storage_metadata: &StorageMetadata,
452        register_ts: Option<Timestamp>,
453        collections: Vec<(GlobalId, CollectionDescription)>,
454    ) -> Result<(), StorageError> {
455        self.create_collections_for_bootstrap(
456            storage_metadata,
457            register_ts,
458            collections,
459            &BTreeSet::new(),
460        )
461        .await
462    }
463
464    /// Like [`Self::create_collections`], except used specifically for bootstrap.
465    ///
466    /// `migrated_storage_collections` is a set of migrated storage collections to be excluded
467    /// from the txn-wal sub-system.
468    async fn create_collections_for_bootstrap(
469        &mut self,
470        storage_metadata: &StorageMetadata,
471        register_ts: Option<Timestamp>,
472        collections: Vec<(GlobalId, CollectionDescription)>,
473        migrated_storage_collections: &BTreeSet<GlobalId>,
474    ) -> Result<(), StorageError>;
475
476    /// Check that the ingestion associated with `id` can use the provided
477    /// [`SourceDesc`].
478    ///
479    /// Note that this check is optimistic and its return of `Ok(())` does not
480    /// guarantee that subsequent calls to `alter_ingestion_source_desc` are
481    /// guaranteed to succeed.
482    fn check_alter_ingestion_source_desc(
483        &mut self,
484        ingestion_id: GlobalId,
485        source_desc: &SourceDesc,
486    ) -> Result<(), StorageError>;
487
488    /// Alters each identified ingestion to use the correlated [`SourceDesc`].
489    async fn alter_ingestion_source_desc(
490        &mut self,
491        ingestion_ids: BTreeMap<GlobalId, SourceDesc>,
492    ) -> Result<(), StorageError>;
493
494    /// Alters each identified collection to use the correlated [`GenericSourceConnection`].
495    async fn alter_ingestion_connections(
496        &mut self,
497        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
498    ) -> Result<(), StorageError>;
499
500    /// Alters the data config for the specified source exports of the specified ingestions.
501    async fn alter_ingestion_export_data_configs(
502        &mut self,
503        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
504    ) -> Result<(), StorageError>;
505
506    async fn alter_table_desc(
507        &mut self,
508        existing_collection: GlobalId,
509        new_collection: GlobalId,
510        new_desc: RelationDesc,
511        expected_version: RelationVersion,
512        register_ts: Timestamp,
513    ) -> Result<(), StorageError>;
514
515    /// Acquire an immutable reference to the export state, should it exist.
516    fn export(&self, id: GlobalId) -> Result<&ExportState, StorageError>;
517
518    /// Acquire a mutable reference to the export state, should it exist.
519    fn export_mut(&mut self, id: GlobalId) -> Result<&mut ExportState, StorageError>;
520
521    /// Create a oneshot ingestion.
522    async fn create_oneshot_ingestion(
523        &mut self,
524        ingestion_id: uuid::Uuid,
525        collection_id: GlobalId,
526        instance_id: StorageInstanceId,
527        request: OneshotIngestionRequest,
528        result_tx: OneshotResultCallback<ProtoBatch>,
529    ) -> Result<(), StorageError>;
530
531    /// Cancel a oneshot ingestion.
532    fn cancel_oneshot_ingestion(&mut self, ingestion_id: uuid::Uuid) -> Result<(), StorageError>;
533
534    /// Alter the sink identified by the given id to match the provided `ExportDescription`.
535    async fn alter_export(
536        &mut self,
537        id: GlobalId,
538        export: ExportDescription,
539    ) -> Result<(), StorageError>;
540
541    /// For each identified export, alter its [`StorageSinkConnection`].
542    async fn alter_export_connections(
543        &mut self,
544        exports: BTreeMap<GlobalId, StorageSinkConnection>,
545    ) -> Result<(), StorageError>;
546
547    /// Drops the read capability for the tables and allows their resources to be reclaimed.
548    fn drop_tables(
549        &mut self,
550        storage_metadata: &StorageMetadata,
551        identifiers: Vec<GlobalId>,
552        ts: Timestamp,
553    ) -> Result<(), StorageError>;
554
555    /// Drops the read capability for the sources and allows their resources to be reclaimed.
556    fn drop_sources(
557        &mut self,
558        storage_metadata: &StorageMetadata,
559        identifiers: Vec<GlobalId>,
560    ) -> Result<(), StorageError>;
561
562    /// Drops the read capability for the sinks and allows their resources to be reclaimed.
563    fn drop_sinks(
564        &mut self,
565        storage_metadata: &StorageMetadata,
566        identifiers: Vec<GlobalId>,
567    ) -> Result<(), StorageError>;
568
569    /// Drops the read capability for the sinks and allows their resources to be reclaimed.
570    ///
571    /// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
572    ///     controller starts/restarts it has no durable state. That means that it has no way of
573    ///     remembering any past commands sent. In the future we plan on persisting state for the
574    ///     controller so that it is aware of past commands.
575    ///     Therefore this method is for dropping sinks that we know to have been previously
576    ///     created, but have been forgotten by the controller due to a restart.
577    ///     Once command history becomes durable we can remove this method and use the normal
578    ///     `drop_sinks`.
579    fn drop_sinks_unvalidated(
580        &mut self,
581        storage_metadata: &StorageMetadata,
582        identifiers: Vec<GlobalId>,
583    );
584
585    /// Drops the read capability for the sources and allows their resources to be reclaimed.
586    ///
587    /// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
588    ///     controller starts/restarts it has no durable state. That means that it has no way of
589    ///     remembering any past commands sent. In the future we plan on persisting state for the
590    ///     controller so that it is aware of past commands.
591    ///     Therefore this method is for dropping sources that we know to have been previously
592    ///     created, but have been forgotten by the controller due to a restart.
593    ///     Once command history becomes durable we can remove this method and use the normal
594    ///     `drop_sources`.
595    fn drop_sources_unvalidated(
596        &mut self,
597        storage_metadata: &StorageMetadata,
598        identifiers: Vec<GlobalId>,
599    ) -> Result<(), StorageError>;
600
601    /// Append `updates` into the local input named `id` and advance its upper to `upper`.
602    ///
603    /// The method returns a oneshot that can be awaited to indicate completion of the write.
604    /// The method may return an error, indicating an immediately visible error, and also the
605    /// oneshot may return an error if one is encountered during the write.
606    ///
607    /// All updates in `commands` are applied atomically.
608    // TODO(petrosagg): switch upper to `Antichain<Timestamp>`
609    fn append_table(
610        &mut self,
611        write_ts: Timestamp,
612        advance_to: Timestamp,
613        commands: Vec<(GlobalId, Vec<TableData>)>,
614    ) -> Result<tokio::sync::oneshot::Receiver<Result<(), StorageError>>, StorageError>;
615
616    /// Returns a [`MonotonicAppender`] which is a channel that can be used to monotonically
617    /// append to the specified [`GlobalId`].
618    fn monotonic_appender(&self, id: GlobalId) -> Result<MonotonicAppender, StorageError>;
619
620    /// Returns a shared [`WebhookStatistics`] which can be used to report user-facing
621    /// statistics for this given webhhook, specified by the [`GlobalId`].
622    ///
623    // This is used to support a fairly special case, where a source needs to report statistics
624    // from outside the ordinary controller-clusterd path. Its possible to merge this with
625    // `monotonic_appender`, whose only current user is webhooks, but given that they will
626    // likely be moved to clusterd, we just leave this a special case.
627    fn webhook_statistics(&self, id: GlobalId) -> Result<Arc<WebhookStatistics>, StorageError>;
628
629    /// Waits until the controller is ready to process a response.
630    ///
631    /// This method may block for an arbitrarily long time.
632    ///
633    /// When the method returns, the owner should call
634    /// [`StorageController::process`] to process the ready message.
635    ///
636    /// This method is cancellation safe.
637    async fn ready(&mut self);
638
639    /// Processes the work queued by [`StorageController::ready`].
640    fn process(
641        &mut self,
642        storage_metadata: &StorageMetadata,
643    ) -> Result<Option<Response>, anyhow::Error>;
644
645    /// Exposes the internal state of the data shard for debugging and QA.
646    ///
647    /// We'll be thoughtful about making unnecessary changes, but the **output
648    /// of this method needs to be gated from users**, so that it's not subject
649    /// to our backward compatibility guarantees.
650    ///
651    /// TODO: Ideally this would return `impl Serialize` so the caller can do
652    /// with it what they like, but that doesn't work in traits yet. The
653    /// workaround (an associated type) doesn't work because persist doesn't
654    /// want to make the type public. In the meantime, move the `serde_json`
655    /// call from the single user into this method.
656    async fn inspect_persist_state(&self, id: GlobalId)
657    -> Result<serde_json::Value, anyhow::Error>;
658
659    /// Records append-only updates for the given introspection type.
660    ///
661    /// Rows passed in `updates` MUST have the correct schema for the given
662    /// introspection type, as readers rely on this and might panic otherwise.
663    fn append_introspection_updates(&mut self, type_: IntrospectionType, updates: Vec<(Row, Diff)>);
664
665    /// Records append-only status updates for the given introspection type.
666    fn append_status_introspection_updates(
667        &mut self,
668        type_: IntrospectionType,
669        updates: Vec<StatusUpdate>,
670    );
671
672    /// Updates the desired state of the given introspection type.
673    ///
674    /// Rows passed in `op` MUST have the correct schema for the given
675    /// introspection type, as readers rely on this and might panic otherwise.
676    fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp);
677
678    /// Returns a sender for updates to the specified append-only introspection collection.
679    ///
680    /// # Panics
681    ///
682    /// Panics if the given introspection type is not associated with an append-only collection.
683    fn append_only_introspection_tx(
684        &self,
685        type_: IntrospectionType,
686    ) -> mpsc::UnboundedSender<(
687        Vec<AppendOnlyUpdate>,
688        oneshot::Sender<Result<(), StorageError>>,
689    )>;
690
691    /// Returns a sender for updates to the specified differential introspection collection.
692    ///
693    /// # Panics
694    ///
695    /// Panics if the given introspection type is not associated with a differential collection.
696    fn differential_introspection_tx(
697        &self,
698        type_: IntrospectionType,
699    ) -> mpsc::UnboundedSender<(StorageWriteOp, oneshot::Sender<Result<(), StorageError>>)>;
700
701    async fn real_time_recent_timestamp(
702        &self,
703        source_ids: BTreeSet<GlobalId>,
704        timeout: Duration,
705    ) -> Result<BoxFuture<Result<Timestamp, StorageError>>, StorageError>;
706
707    /// Returns the state of the [`StorageController`] formatted as JSON.
708    fn dump(&self) -> Result<serde_json::Value, anyhow::Error>;
709}
710
711impl DataSource {
712    /// Returns true if the storage controller manages the data shard for this
713    /// source using txn-wal.
714    pub fn in_txns(&self) -> bool {
715        match self {
716            DataSource::Table => true,
717            DataSource::Other
718            | DataSource::Ingestion(_)
719            | DataSource::IngestionExport { .. }
720            | DataSource::Introspection(_)
721            | DataSource::Progress
722            | DataSource::Webhook => false,
723            DataSource::Sink { .. } => false,
724        }
725    }
726}
727
728/// A wrapper struct that presents the adapter token to a format that is understandable by persist
729/// and also allows us to differentiate between a token being present versus being set for the
730/// first time.
731#[derive(PartialEq, Clone, Debug, Default)]
732pub struct PersistEpoch(pub Option<NonZeroI64>);
733
734impl Codec64 for PersistEpoch {
735    fn codec_name() -> String {
736        "PersistEpoch".to_owned()
737    }
738
739    fn encode(&self) -> [u8; 8] {
740        self.0.map(NonZeroI64::get).unwrap_or(0).to_le_bytes()
741    }
742
743    fn decode(buf: [u8; 8]) -> Self {
744        Self(NonZeroI64::new(i64::from_le_bytes(buf)))
745    }
746}
747
748impl From<NonZeroI64> for PersistEpoch {
749    fn from(epoch: NonZeroI64) -> Self {
750        Self(Some(epoch))
751    }
752}
753
754/// State maintained about individual exports.
755#[derive(Debug)]
756pub struct ExportState {
757    /// Really only for keeping track of changes to the `derived_since`.
758    pub read_capabilities: MutableAntichain<Timestamp>,
759
760    /// The cluster this export is associated with.
761    pub cluster_id: StorageInstanceId,
762
763    /// The current since frontier, derived from `write_frontier` using
764    /// `hold_policy`.
765    pub derived_since: Antichain<Timestamp>,
766
767    /// The read holds that this export has on its dependencies (its input and itself). When
768    /// the upper of the export changes, we downgrade this, which in turn
769    /// downgrades holds we have on our dependencies' sinces.
770    pub read_holds: [ReadHold; 2],
771
772    /// The policy to use to downgrade `self.read_capability`.
773    pub read_policy: ReadPolicy,
774
775    /// Reported write frontier.
776    pub write_frontier: Antichain<Timestamp>,
777}
778
779impl ExportState {
780    pub fn new(
781        cluster_id: StorageInstanceId,
782        read_hold: ReadHold,
783        self_hold: ReadHold,
784        write_frontier: Antichain<Timestamp>,
785        read_policy: ReadPolicy,
786    ) -> Self {
787        let mut dependency_since = Antichain::from_elem(Timestamp::MIN);
788        for read_hold in [&read_hold, &self_hold] {
789            dependency_since.join_assign(read_hold.since());
790        }
791        Self {
792            read_capabilities: MutableAntichain::from(dependency_since.borrow()),
793            cluster_id,
794            derived_since: dependency_since,
795            read_holds: [read_hold, self_hold],
796            read_policy,
797            write_frontier,
798        }
799    }
800
801    /// Returns the cluster to which the export is bound.
802    pub fn cluster_id(&self) -> StorageInstanceId {
803        self.cluster_id
804    }
805
806    /// Returns the cluster to which the export is bound.
807    pub fn input_hold(&self) -> &ReadHold {
808        &self.read_holds[0]
809    }
810
811    /// Returns whether the export was dropped.
812    pub fn is_dropped(&self) -> bool {
813        self.read_holds.iter().all(|h| h.since().is_empty())
814    }
815}
816/// A channel that allows you to append a set of updates to a pre-defined [`GlobalId`].
817///
818/// See `CollectionManager::monotonic_appender` to acquire a [`MonotonicAppender`].
819#[derive(Clone, Debug)]
820pub struct MonotonicAppender {
821    /// Channel that sends to a [`tokio::task`] which pushes updates to Persist.
822    tx: mpsc::UnboundedSender<(
823        Vec<AppendOnlyUpdate>,
824        oneshot::Sender<Result<(), StorageError>>,
825    )>,
826}
827
828impl MonotonicAppender {
829    pub fn new(
830        tx: mpsc::UnboundedSender<(
831            Vec<AppendOnlyUpdate>,
832            oneshot::Sender<Result<(), StorageError>>,
833        )>,
834    ) -> Self {
835        MonotonicAppender { tx }
836    }
837
838    pub async fn append(&self, updates: Vec<AppendOnlyUpdate>) -> Result<(), StorageError> {
839        let (tx, rx) = oneshot::channel();
840
841        // Send our update to the CollectionManager.
842        self.tx
843            .send((updates, tx))
844            .map_err(|_| StorageError::ShuttingDown("collection manager"))?;
845
846        // Wait for a response, if we fail to receive then the CollectionManager has gone away.
847        let result = rx
848            .await
849            .map_err(|_| StorageError::ShuttingDown("collection manager"))?;
850
851        result
852    }
853}
854
855/// A wallclock lag measurement.
856///
857/// The enum representation reflects the fact that wallclock lag is undefined for unreadable
858/// collections, i.e. collections that contain no readable times.
859#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
860pub enum WallclockLag {
861    /// Lag value in seconds, for readable collections.
862    Seconds(u64),
863    /// Undefined lag, for unreadable collections.
864    Undefined,
865}
866
867impl WallclockLag {
868    /// The smallest possible wallclock lag measurement.
869    pub const MIN: Self = Self::Seconds(0);
870
871    /// Return the maximum of two lag values.
872    ///
873    /// We treat `Undefined` lags as greater than `Seconds`, to ensure we never report low lag
874    /// values when a collection was actually unreadable for some amount of time.
875    pub fn max(self, other: Self) -> Self {
876        match (self, other) {
877            (Self::Seconds(a), Self::Seconds(b)) => Self::Seconds(a.max(b)),
878            (Self::Undefined, _) | (_, Self::Undefined) => Self::Undefined,
879        }
880    }
881
882    /// Return the wrapped seconds value, or a default if the lag is `Undefined`.
883    pub fn unwrap_seconds_or(self, default: u64) -> u64 {
884        match self {
885            Self::Seconds(s) => s,
886            Self::Undefined => default,
887        }
888    }
889
890    /// Create a new `WallclockLag` by transforming the wrapped seconds value.
891    pub fn map_seconds(self, f: impl FnOnce(u64) -> u64) -> Self {
892        match self {
893            Self::Seconds(s) => Self::Seconds(f(s)),
894            Self::Undefined => Self::Undefined,
895        }
896    }
897
898    /// Convert this lag value into a [`Datum::Interval`] or [`Datum::Null`].
899    pub fn into_interval_datum(self) -> Datum<'static> {
900        match self {
901            Self::Seconds(secs) => {
902                let micros = i64::try_from(secs * 1_000_000).expect("must fit");
903                Datum::Interval(Interval::new(0, 0, micros))
904            }
905            Self::Undefined => Datum::Null,
906        }
907    }
908
909    /// Convert this lag value into a [`Datum::UInt64`] or [`Datum::Null`].
910    pub fn into_uint64_datum(self) -> Datum<'static> {
911        match self {
912            Self::Seconds(secs) => Datum::UInt64(secs),
913            Self::Undefined => Datum::Null,
914        }
915    }
916}
917
918/// The period covered by a wallclock lag histogram, represented as a `[start, end)` range.
919#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
920pub struct WallclockLagHistogramPeriod {
921    pub start: CheckedTimestamp<DateTime<Utc>>,
922    pub end: CheckedTimestamp<DateTime<Utc>>,
923}
924
925impl WallclockLagHistogramPeriod {
926    /// Construct a `WallclockLagHistogramPeriod` from the given epoch timestamp and dyncfg.
927    pub fn from_epoch_millis(epoch_ms: u64, dyncfg: &ConfigSet) -> Self {
928        let interval = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.get(dyncfg);
929        let interval_ms = u64::try_from(interval.as_millis()).unwrap_or_else(|_| {
930            soft_panic_or_log!("excessive wallclock lag histogram period interval: {interval:?}");
931            let default = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.default();
932            u64::try_from(default.as_millis()).unwrap()
933        });
934        let interval_ms = std::cmp::max(interval_ms, 1);
935
936        let start_ms = epoch_ms - (epoch_ms % interval_ms);
937        let start_dt = mz_ore::now::to_datetime(start_ms);
938        let start = start_dt.try_into().expect("must fit");
939
940        let end_ms = start_ms + interval_ms;
941        let end_dt = mz_ore::now::to_datetime(end_ms);
942        let end = end_dt.try_into().expect("must fit");
943
944        Self { start, end }
945    }
946}
947
948#[cfg(test)]
949mod tests {
950    use super::*;
951
952    #[mz_ore::test]
953    fn lag_writes_by_zero() {
954        let policy =
955            ReadPolicy::lag_writes_by(mz_repr::Timestamp::default(), mz_repr::Timestamp::default());
956        let write_frontier = Antichain::from_elem(mz_repr::Timestamp::from(5));
957        assert_eq!(policy.frontier(write_frontier.borrow()), write_frontier);
958    }
959}