mz_storage_client/
controller.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller that provides an interface to the storage layer.
11//!
12//! The storage controller curates the creation of sources, the progress of readers through these collections,
13//! and their eventual dropping and resource reclamation.
14//!
15//! The storage controller can be viewed as a partial map from `GlobalId` to collection. It is an error to
16//! use an identifier before it has been "created" with `create_source()`. Once created, the controller holds
17//! a read capability for each source, which is manipulated with `update_read_capabilities()`.
18//! Eventually, the source is dropped with either `drop_sources()` or by allowing compaction to the
19//! empty frontier.
20
21use std::collections::{BTreeMap, BTreeSet};
22use std::fmt::Debug;
23use std::future::Future;
24use std::num::NonZeroI64;
25use std::pin::Pin;
26use std::sync::Arc;
27use std::time::Duration;
28
29use async_trait::async_trait;
30use chrono::{DateTime, Utc};
31use differential_dataflow::lattice::Lattice;
32use mz_cluster_client::ReplicaId;
33use mz_cluster_client::client::ClusterReplicaLocation;
34use mz_controller_types::dyncfgs::WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL;
35use mz_dyncfg::ConfigSet;
36use mz_ore::soft_panic_or_log;
37use mz_persist_client::batch::ProtoBatch;
38use mz_persist_types::{Codec64, Opaque, ShardId};
39use mz_repr::adt::interval::Interval;
40use mz_repr::adt::timestamp::CheckedTimestamp;
41use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row};
42use mz_storage_types::configuration::StorageConfiguration;
43use mz_storage_types::connections::inline::InlinedConnection;
44use mz_storage_types::controller::{CollectionMetadata, StorageError};
45use mz_storage_types::instances::StorageInstanceId;
46use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
47use mz_storage_types::parameters::StorageParameters;
48use mz_storage_types::read_holds::ReadHold;
49use mz_storage_types::read_policy::ReadPolicy;
50use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
51use mz_storage_types::sources::{
52    GenericSourceConnection, IngestionDescription, SourceDesc, SourceExportDataConfig,
53    SourceExportDetails, Timeline,
54};
55use serde::{Deserialize, Serialize};
56use timely::progress::Timestamp as TimelyTimestamp;
57use timely::progress::frontier::MutableAntichain;
58use timely::progress::{Antichain, Timestamp};
59use tokio::sync::{mpsc, oneshot};
60
61use crate::client::{AppendOnlyUpdate, StatusUpdate, TableData};
62use crate::statistics::WebhookStatistics;
63
64#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq, Hash, PartialOrd, Ord)]
65pub enum IntrospectionType {
66    /// We're not responsible for appending to this collection automatically, but we should
67    /// automatically bump the write frontier from time to time.
68    SinkStatusHistory,
69    SourceStatusHistory,
70    ShardMapping,
71
72    Frontiers,
73    ReplicaFrontiers,
74
75    ReplicaStatusHistory,
76    ReplicaMetricsHistory,
77    WallclockLagHistory,
78    WallclockLagHistogram,
79
80    // Note that this single-shard introspection source will be changed to per-replica,
81    // once we allow multiplexing multiple sources/sinks on a single cluster.
82    StorageSourceStatistics,
83    StorageSinkStatistics,
84
85    // The below are for statement logging.
86    StatementExecutionHistory,
87    SessionHistory,
88    PreparedStatementHistory,
89    SqlText,
90    // For statement lifecycle logging, which is closely related
91    // to statement logging
92    StatementLifecycleHistory,
93
94    // Collections written by the compute controller.
95    ComputeDependencies,
96    ComputeOperatorHydrationStatus,
97    ComputeMaterializedViewRefreshes,
98    ComputeErrorCounts,
99    ComputeHydrationTimes,
100
101    // Written by the Adapter for tracking AWS PrivateLink Connection Status History
102    PrivatelinkConnectionStatusHistory,
103}
104
105/// Describes how data is written to the collection.
106#[derive(Clone, Debug, Eq, PartialEq)]
107pub enum DataSource<T> {
108    /// Ingest data from some external source.
109    Ingestion(IngestionDescription),
110    /// This source receives its data from the identified ingestion,
111    /// from an external object identified using `SourceExportDetails`.
112    ///
113    /// The referenced ingestion must be created before all of its exports.
114    IngestionExport {
115        ingestion_id: GlobalId,
116        details: SourceExportDetails,
117        data_config: SourceExportDataConfig,
118    },
119    /// Data comes from introspection sources, which the controller itself is
120    /// responsible for generating.
121    Introspection(IntrospectionType),
122    /// Data comes from the source's remapping/reclock operator.
123    Progress,
124    /// Data comes from external HTTP requests pushed to Materialize.
125    Webhook,
126    /// The adapter layer appends timestamped data, i.e. it is a `TABLE`.
127    Table {
128        /// This table has had columns added or dropped to it, so we're now a
129        /// "view" over the "primary" Table/collection. Within the
130        /// `storage-controller` we the primary as a dependency.
131        primary: Option<GlobalId>,
132    },
133    /// This source's data does not need to be managed by the storage
134    /// controller, e.g. it's a materialized view or the catalog collection.
135    Other,
136    /// This collection is the output collection of a sink.
137    Sink { desc: ExportDescription<T> },
138}
139
140/// Describes a request to create a source.
141#[derive(Clone, Debug, Eq, PartialEq)]
142pub struct CollectionDescription<T> {
143    /// The schema of this collection
144    pub desc: RelationDesc,
145    /// The source of this collection's data.
146    pub data_source: DataSource<T>,
147    /// An optional frontier to which the collection's `since` should be advanced.
148    pub since: Option<Antichain<T>>,
149    /// A GlobalId to use for this collection to use for the status collection.
150    /// Used to keep track of source status/error information.
151    pub status_collection_id: Option<GlobalId>,
152    /// The timeline of the source. Absent for materialized views, continual tasks, etc.
153    pub timeline: Option<Timeline>,
154}
155
156impl<T> CollectionDescription<T> {
157    pub fn for_table(desc: RelationDesc, primary: Option<GlobalId>) -> Self {
158        Self {
159            desc,
160            data_source: DataSource::Table { primary },
161            since: None,
162            status_collection_id: None,
163            timeline: Some(Timeline::EpochMilliseconds),
164        }
165    }
166}
167
168#[derive(Clone, Debug, Eq, PartialEq)]
169pub struct ExportDescription<T = mz_repr::Timestamp> {
170    pub sink: StorageSinkDesc<(), T>,
171    /// The ID of the instance in which to install the export.
172    pub instance_id: StorageInstanceId,
173}
174
175#[derive(Debug)]
176pub enum Response<T> {
177    FrontierUpdates(Vec<(GlobalId, Antichain<T>)>),
178}
179
180/// Metadata that the storage controller must know to properly handle the life
181/// cycle of creating and dropping collections.
182///
183/// This data should be kept consistent with the state modified using
184/// [`StorageTxn`].
185///
186/// n.b. the "txn WAL shard" is also metadata that's persisted, but if we
187/// included it in this struct it would never be read.
188#[derive(Debug, Clone, Serialize, Default)]
189pub struct StorageMetadata {
190    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
191    pub collection_metadata: BTreeMap<GlobalId, ShardId>,
192    pub unfinalized_shards: BTreeSet<ShardId>,
193}
194
195impl StorageMetadata {
196    pub fn get_collection_shard<T>(&self, id: GlobalId) -> Result<ShardId, StorageError<T>> {
197        let shard_id = self
198            .collection_metadata
199            .get(&id)
200            .ok_or(StorageError::IdentifierMissing(id))?;
201
202        Ok(*shard_id)
203    }
204}
205
206/// Provides an interface for the storage controller to read and write data that
207/// is recorded elsewhere.
208///
209/// Data written to the implementor of this trait should make a consistent view
210/// of the data available through [`StorageMetadata`].
211#[async_trait]
212pub trait StorageTxn<T> {
213    /// Retrieve all of the visible storage metadata.
214    ///
215    /// The value of this map should be treated as opaque.
216    fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId>;
217
218    /// Add new storage metadata for a collection.
219    ///
220    /// Subsequent calls to [`StorageTxn::get_collection_metadata`] must include
221    /// this data.
222    fn insert_collection_metadata(
223        &mut self,
224        s: BTreeMap<GlobalId, ShardId>,
225    ) -> Result<(), StorageError<T>>;
226
227    /// Remove the metadata associated with the identified collections.
228    ///
229    /// Subsequent calls to [`StorageTxn::get_collection_metadata`] must not
230    /// include these keys.
231    fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)>;
232
233    /// Retrieve all of the shards that are no longer in use by an active
234    /// collection but are yet to be finalized.
235    fn get_unfinalized_shards(&self) -> BTreeSet<ShardId>;
236
237    /// Insert the specified values as unfinalized shards.
238    fn insert_unfinalized_shards(&mut self, s: BTreeSet<ShardId>) -> Result<(), StorageError<T>>;
239
240    /// Mark the specified shards as finalized, deleting them from the
241    /// unfinalized shard collection.
242    fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>);
243
244    /// Get the txn WAL shard for this environment if it exists.
245    fn get_txn_wal_shard(&self) -> Option<ShardId>;
246
247    /// Store the specified shard as the environment's txn WAL shard.
248    ///
249    /// The implementor should error if the shard is already specified.
250    fn write_txn_wal_shard(&mut self, shard: ShardId) -> Result<(), StorageError<T>>;
251}
252
253pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
254
255/// A predicate for a `Row` filter.
256pub type RowPredicate = Box<dyn Fn(&Row) -> bool + Send + Sync>;
257
258/// High-level write operations applicable to storage collections.
259pub enum StorageWriteOp {
260    /// Append a set of rows with specified multiplicities.
261    ///
262    /// The multiplicities may be negative, so an `Append` operation can perform
263    /// both insertions and retractions.
264    Append { updates: Vec<(Row, Diff)> },
265    /// Delete all rows matching the given predicate.
266    Delete { filter: RowPredicate },
267}
268
269impl StorageWriteOp {
270    /// Returns whether this operation appends an empty set of updates.
271    pub fn is_empty_append(&self) -> bool {
272        match self {
273            Self::Append { updates } => updates.is_empty(),
274            Self::Delete { .. } => false,
275        }
276    }
277}
278
279#[async_trait(?Send)]
280pub trait StorageController: Debug {
281    type Timestamp: TimelyTimestamp;
282
283    /// Marks the end of any initialization commands.
284    ///
285    /// The implementor may wait for this method to be called before implementing prior commands,
286    /// and so it is important for a user to invoke this method as soon as it is comfortable.
287    /// This method can be invoked immediately, at the potential expense of performance.
288    fn initialization_complete(&mut self);
289
290    /// Update storage configuration with new parameters.
291    fn update_parameters(&mut self, config_params: StorageParameters);
292
293    /// Get the current configuration, including parameters updated with `update_parameters`.
294    fn config(&self) -> &StorageConfiguration;
295
296    /// Returns the [CollectionMetadata] of the collection identified by `id`.
297    fn collection_metadata(
298        &self,
299        id: GlobalId,
300    ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>;
301
302    /// Returns `true` iff the given collection/ingestion has been hydrated.
303    ///
304    /// For this check, zero-replica clusters are always considered hydrated.
305    /// Their collections would never normally be considered hydrated but it's
306    /// clearly intentional that they have no replicas.
307    fn collection_hydrated(
308        &self,
309        collection_id: GlobalId,
310    ) -> Result<bool, StorageError<Self::Timestamp>>;
311
312    /// Returns `true` if each non-transient, non-excluded collection is
313    /// hydrated on at least one of the provided replicas.
314    ///
315    /// If no replicas are provided, this checks for hydration on _any_ replica.
316    ///
317    /// This also returns `true` in case this cluster does not have any
318    /// replicas.
319    fn collections_hydrated_on_replicas(
320        &self,
321        target_replica_ids: Option<Vec<ReplicaId>>,
322        target_cluster_ids: &StorageInstanceId,
323        exclude_collections: &BTreeSet<GlobalId>,
324    ) -> Result<bool, StorageError<Self::Timestamp>>;
325
326    /// Returns the since/upper frontiers of the identified collection.
327    fn collection_frontiers(
328        &self,
329        id: GlobalId,
330    ) -> Result<
331        (Antichain<Self::Timestamp>, Antichain<Self::Timestamp>),
332        StorageError<Self::Timestamp>,
333    >;
334
335    /// Returns the since/upper frontiers of the identified collections.
336    ///
337    /// Having a method that returns both frontiers at the same time, for all
338    /// requested collections, ensures that we can get a consistent "snapshot"
339    /// of collection state. If we had separate methods instead, and/or would
340    /// allow getting frontiers for collections one at a time, it could happen
341    /// that collection state changes concurrently, while information is
342    /// gathered.
343    fn collections_frontiers(
344        &self,
345        id: Vec<GlobalId>,
346    ) -> Result<
347        Vec<(
348            GlobalId,
349            Antichain<Self::Timestamp>,
350            Antichain<Self::Timestamp>,
351        )>,
352        StorageError<Self::Timestamp>,
353    >;
354
355    /// Acquire an iterator over [CollectionMetadata] for all active
356    /// collections.
357    ///
358    /// A collection is "active" when it has a non empty frontier of read
359    /// capabilties.
360    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
361
362    /// Returns the IDs of all active ingestions for the given storage instance.
363    fn active_ingestions(
364        &self,
365        instance_id: StorageInstanceId,
366    ) -> Box<dyn Iterator<Item = &GlobalId> + '_>;
367
368    /// Checks whether a collection exists under the given `GlobalId`. Returns
369    /// an error if the collection does not exist.
370    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
371
372    /// Creates a storage instance with the specified ID.
373    ///
374    /// A storage instance can have zero or one replicas. The instance is
375    /// created with zero replicas.
376    ///
377    /// Panics if a storage instance with the given ID already exists.
378    fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>);
379
380    /// Drops the storage instance with the given ID.
381    ///
382    /// If you call this method while the storage instance has a replica
383    /// attached, that replica will be leaked. Call `drop_replica` first.
384    ///
385    /// Panics if a storage instance with the given ID does not exist.
386    fn drop_instance(&mut self, id: StorageInstanceId);
387
388    /// Updates a storage instance's workload class.
389    fn update_instance_workload_class(
390        &mut self,
391        id: StorageInstanceId,
392        workload_class: Option<String>,
393    );
394
395    /// Connects the storage instance to the specified replica.
396    ///
397    /// If the storage instance is already attached to a replica, communication
398    /// with that replica is severed in favor of the new replica.
399    ///
400    /// In the future, this API will be adjusted to support active replication
401    /// of storage instances (i.e., multiple replicas attached to a given
402    /// storage instance).
403    fn connect_replica(
404        &mut self,
405        instance_id: StorageInstanceId,
406        replica_id: ReplicaId,
407        location: ClusterReplicaLocation,
408        enable_ctp: bool,
409    );
410
411    /// Disconnects the storage instance from the specified replica.
412    fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId);
413
414    /// Across versions of Materialize the nullability of columns for some objects can change based
415    /// on updates to our optimizer.
416    ///
417    /// During bootstrap we will register these new schemas with Persist.
418    ///
419    /// See: <https://github.com/MaterializeInc/database-issues/issues/2488>
420    async fn evolve_nullability_for_bootstrap(
421        &mut self,
422        storage_metadata: &StorageMetadata,
423        collections: Vec<(GlobalId, RelationDesc)>,
424    ) -> Result<(), StorageError<Self::Timestamp>>;
425
426    /// Create the sources described in the individual RunIngestionCommand commands.
427    ///
428    /// Each command carries the source id, the source description, and any associated metadata
429    /// needed to ingest the particular source.
430    ///
431    /// This command installs collection state for the indicated sources, and they are
432    /// now valid to use in queries at times beyond the initial `since` frontiers. Each
433    /// collection also acquires a read capability at this frontier, which will need to
434    /// be repeatedly downgraded with `allow_compaction()` to permit compaction.
435    ///
436    /// This method is NOT idempotent; It can fail between processing of different
437    /// collections and leave the controller in an inconsistent state. It is almost
438    /// always wrong to do anything but abort the process on `Err`.
439    ///
440    /// The `register_ts` is used as the initial timestamp that tables are available for reads. (We
441    /// might later give non-tables the same treatment, but hold off on that initially.) Callers
442    /// must provide a Some if any of the collections is a table. A None may be given if none of the
443    /// collections are a table (i.e. all materialized views, sources, etc).
444    async fn create_collections(
445        &mut self,
446        storage_metadata: &StorageMetadata,
447        register_ts: Option<Self::Timestamp>,
448        collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
449    ) -> Result<(), StorageError<Self::Timestamp>> {
450        self.create_collections_for_bootstrap(
451            storage_metadata,
452            register_ts,
453            collections,
454            &BTreeSet::new(),
455        )
456        .await
457    }
458
459    /// Like [`Self::create_collections`], except used specifically for bootstrap.
460    ///
461    /// `migrated_storage_collections` is a set of migrated storage collections to be excluded
462    /// from the txn-wal sub-system.
463    async fn create_collections_for_bootstrap(
464        &mut self,
465        storage_metadata: &StorageMetadata,
466        register_ts: Option<Self::Timestamp>,
467        collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
468        migrated_storage_collections: &BTreeSet<GlobalId>,
469    ) -> Result<(), StorageError<Self::Timestamp>>;
470
471    /// Check that the ingestion associated with `id` can use the provided
472    /// [`SourceDesc`].
473    ///
474    /// Note that this check is optimistic and its return of `Ok(())` does not
475    /// guarantee that subsequent calls to `alter_ingestion_source_desc` are
476    /// guaranteed to succeed.
477    fn check_alter_ingestion_source_desc(
478        &mut self,
479        ingestion_id: GlobalId,
480        source_desc: &SourceDesc,
481    ) -> Result<(), StorageError<Self::Timestamp>>;
482
483    /// Alters the identified collection to use the provided [`SourceDesc`].
484    async fn alter_ingestion_source_desc(
485        &mut self,
486        ingestion_id: GlobalId,
487        source_desc: SourceDesc,
488    ) -> Result<(), StorageError<Self::Timestamp>>;
489
490    /// Alters each identified collection to use the correlated [`GenericSourceConnection`].
491    async fn alter_ingestion_connections(
492        &mut self,
493        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
494    ) -> Result<(), StorageError<Self::Timestamp>>;
495
496    /// Alters the data config for the specified source exports of the specified ingestions.
497    async fn alter_ingestion_export_data_configs(
498        &mut self,
499        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
500    ) -> Result<(), StorageError<Self::Timestamp>>;
501
502    async fn alter_table_desc(
503        &mut self,
504        existing_collection: GlobalId,
505        new_collection: GlobalId,
506        new_desc: RelationDesc,
507        expected_version: RelationVersion,
508        register_ts: Self::Timestamp,
509    ) -> Result<(), StorageError<Self::Timestamp>>;
510
511    /// Acquire an immutable reference to the export state, should it exist.
512    fn export(
513        &self,
514        id: GlobalId,
515    ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>;
516
517    /// Acquire a mutable reference to the export state, should it exist.
518    fn export_mut(
519        &mut self,
520        id: GlobalId,
521    ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>;
522
523    /// Create a oneshot ingestion.
524    async fn create_oneshot_ingestion(
525        &mut self,
526        ingestion_id: uuid::Uuid,
527        collection_id: GlobalId,
528        instance_id: StorageInstanceId,
529        request: OneshotIngestionRequest,
530        result_tx: OneshotResultCallback<ProtoBatch>,
531    ) -> Result<(), StorageError<Self::Timestamp>>;
532
533    /// Cancel a oneshot ingestion.
534    fn cancel_oneshot_ingestion(
535        &mut self,
536        ingestion_id: uuid::Uuid,
537    ) -> Result<(), StorageError<Self::Timestamp>>;
538
539    /// Alter the sink identified by the given id to match the provided `ExportDescription`.
540    async fn alter_export(
541        &mut self,
542        id: GlobalId,
543        export: ExportDescription<Self::Timestamp>,
544    ) -> Result<(), StorageError<Self::Timestamp>>;
545
546    /// For each identified export, alter its [`StorageSinkConnection`].
547    async fn alter_export_connections(
548        &mut self,
549        exports: BTreeMap<GlobalId, StorageSinkConnection>,
550    ) -> Result<(), StorageError<Self::Timestamp>>;
551
552    /// Drops the read capability for the tables and allows their resources to be reclaimed.
553    fn drop_tables(
554        &mut self,
555        storage_metadata: &StorageMetadata,
556        identifiers: Vec<GlobalId>,
557        ts: Self::Timestamp,
558    ) -> Result<(), StorageError<Self::Timestamp>>;
559
560    /// Drops the read capability for the sources and allows their resources to be reclaimed.
561    fn drop_sources(
562        &mut self,
563        storage_metadata: &StorageMetadata,
564        identifiers: Vec<GlobalId>,
565    ) -> Result<(), StorageError<Self::Timestamp>>;
566
567    /// Drops the read capability for the sinks and allows their resources to be reclaimed.
568    fn drop_sinks(
569        &mut self,
570        storage_metadata: &StorageMetadata,
571        identifiers: Vec<GlobalId>,
572    ) -> Result<(), StorageError<Self::Timestamp>>;
573
574    /// Drops the read capability for the sinks and allows their resources to be reclaimed.
575    ///
576    /// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
577    ///     controller starts/restarts it has no durable state. That means that it has no way of
578    ///     remembering any past commands sent. In the future we plan on persisting state for the
579    ///     controller so that it is aware of past commands.
580    ///     Therefore this method is for dropping sinks that we know to have been previously
581    ///     created, but have been forgotten by the controller due to a restart.
582    ///     Once command history becomes durable we can remove this method and use the normal
583    ///     `drop_sinks`.
584    fn drop_sinks_unvalidated(
585        &mut self,
586        storage_metadata: &StorageMetadata,
587        identifiers: Vec<GlobalId>,
588    );
589
590    /// Drops the read capability for the sources and allows their resources to be reclaimed.
591    ///
592    /// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
593    ///     controller starts/restarts it has no durable state. That means that it has no way of
594    ///     remembering any past commands sent. In the future we plan on persisting state for the
595    ///     controller so that it is aware of past commands.
596    ///     Therefore this method is for dropping sources that we know to have been previously
597    ///     created, but have been forgotten by the controller due to a restart.
598    ///     Once command history becomes durable we can remove this method and use the normal
599    ///     `drop_sources`.
600    fn drop_sources_unvalidated(
601        &mut self,
602        storage_metadata: &StorageMetadata,
603        identifiers: Vec<GlobalId>,
604    ) -> Result<(), StorageError<Self::Timestamp>>;
605
606    /// Append `updates` into the local input named `id` and advance its upper to `upper`.
607    ///
608    /// The method returns a oneshot that can be awaited to indicate completion of the write.
609    /// The method may return an error, indicating an immediately visible error, and also the
610    /// oneshot may return an error if one is encountered during the write.
611    ///
612    /// All updates in `commands` are applied atomically.
613    // TODO(petrosagg): switch upper to `Antichain<Timestamp>`
614    fn append_table(
615        &mut self,
616        write_ts: Self::Timestamp,
617        advance_to: Self::Timestamp,
618        commands: Vec<(GlobalId, Vec<TableData>)>,
619    ) -> Result<
620        tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
621        StorageError<Self::Timestamp>,
622    >;
623
624    /// Returns a [`MonotonicAppender`] which is a channel that can be used to monotonically
625    /// append to the specified [`GlobalId`].
626    fn monotonic_appender(
627        &self,
628        id: GlobalId,
629    ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>>;
630
631    /// Returns a shared [`WebhookStatistics`] which can be used to report user-facing
632    /// statistics for this given webhhook, specified by the [`GlobalId`].
633    ///
634    // This is used to support a fairly special case, where a source needs to report statistics
635    // from outside the ordinary controller-clusterd path. Its possible to merge this with
636    // `monotonic_appender`, whose only current user is webhooks, but given that they will
637    // likely be moved to clusterd, we just leave this a special case.
638    fn webhook_statistics(
639        &self,
640        id: GlobalId,
641    ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>>;
642
643    /// Waits until the controller is ready to process a response.
644    ///
645    /// This method may block for an arbitrarily long time.
646    ///
647    /// When the method returns, the owner should call
648    /// [`StorageController::process`] to process the ready message.
649    ///
650    /// This method is cancellation safe.
651    async fn ready(&mut self);
652
653    /// Processes the work queued by [`StorageController::ready`].
654    fn process(
655        &mut self,
656        storage_metadata: &StorageMetadata,
657    ) -> Result<Option<Response<Self::Timestamp>>, anyhow::Error>;
658
659    /// Exposes the internal state of the data shard for debugging and QA.
660    ///
661    /// We'll be thoughtful about making unnecessary changes, but the **output
662    /// of this method needs to be gated from users**, so that it's not subject
663    /// to our backward compatibility guarantees.
664    ///
665    /// TODO: Ideally this would return `impl Serialize` so the caller can do
666    /// with it what they like, but that doesn't work in traits yet. The
667    /// workaround (an associated type) doesn't work because persist doesn't
668    /// want to make the type public. In the meantime, move the `serde_json`
669    /// call from the single user into this method.
670    async fn inspect_persist_state(&self, id: GlobalId)
671    -> Result<serde_json::Value, anyhow::Error>;
672
673    /// Records append-only updates for the given introspection type.
674    ///
675    /// Rows passed in `updates` MUST have the correct schema for the given
676    /// introspection type, as readers rely on this and might panic otherwise.
677    fn append_introspection_updates(&mut self, type_: IntrospectionType, updates: Vec<(Row, Diff)>);
678
679    /// Records append-only status updates for the given introspection type.
680    fn append_status_introspection_updates(
681        &mut self,
682        type_: IntrospectionType,
683        updates: Vec<StatusUpdate>,
684    );
685
686    /// Updates the desired state of the given introspection type.
687    ///
688    /// Rows passed in `op` MUST have the correct schema for the given
689    /// introspection type, as readers rely on this and might panic otherwise.
690    fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp);
691
692    /// Returns a sender for updates to the specified append-only introspection collection.
693    ///
694    /// # Panics
695    ///
696    /// Panics if the given introspection type is not associated with an append-only collection.
697    fn append_only_introspection_tx(
698        &self,
699        type_: IntrospectionType,
700    ) -> mpsc::UnboundedSender<(
701        Vec<AppendOnlyUpdate>,
702        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
703    )>;
704
705    /// Returns a sender for updates to the specified differential introspection collection.
706    ///
707    /// # Panics
708    ///
709    /// Panics if the given introspection type is not associated with a differential collection.
710    fn differential_introspection_tx(
711        &self,
712        type_: IntrospectionType,
713    ) -> mpsc::UnboundedSender<(
714        StorageWriteOp,
715        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
716    )>;
717
718    async fn real_time_recent_timestamp(
719        &self,
720        source_ids: BTreeSet<GlobalId>,
721        timeout: Duration,
722    ) -> Result<
723        BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
724        StorageError<Self::Timestamp>,
725    >;
726}
727
728impl<T> DataSource<T> {
729    /// Returns true if the storage controller manages the data shard for this
730    /// source using txn-wal.
731    pub fn in_txns(&self) -> bool {
732        match self {
733            DataSource::Table { .. } => true,
734            DataSource::Other
735            | DataSource::Ingestion(_)
736            | DataSource::IngestionExport { .. }
737            | DataSource::Introspection(_)
738            | DataSource::Progress
739            | DataSource::Webhook => false,
740            DataSource::Sink { .. } => false,
741        }
742    }
743}
744
745/// A wrapper struct that presents the adapter token to a format that is understandable by persist
746/// and also allows us to differentiate between a token being present versus being set for the
747/// first time.
748#[derive(PartialEq, Clone, Debug)]
749pub struct PersistEpoch(pub Option<NonZeroI64>);
750
751impl Opaque for PersistEpoch {
752    fn initial() -> Self {
753        PersistEpoch(None)
754    }
755}
756
757impl Codec64 for PersistEpoch {
758    fn codec_name() -> String {
759        "PersistEpoch".to_owned()
760    }
761
762    fn encode(&self) -> [u8; 8] {
763        self.0.map(NonZeroI64::get).unwrap_or(0).to_le_bytes()
764    }
765
766    fn decode(buf: [u8; 8]) -> Self {
767        Self(NonZeroI64::new(i64::from_le_bytes(buf)))
768    }
769}
770
771impl From<NonZeroI64> for PersistEpoch {
772    fn from(epoch: NonZeroI64) -> Self {
773        Self(Some(epoch))
774    }
775}
776
777/// State maintained about individual exports.
778#[derive(Debug)]
779pub struct ExportState<T: TimelyTimestamp> {
780    /// Really only for keeping track of changes to the `derived_since`.
781    pub read_capabilities: MutableAntichain<T>,
782
783    /// The cluster this export is associated with.
784    pub cluster_id: StorageInstanceId,
785
786    /// The current since frontier, derived from `write_frontier` using
787    /// `hold_policy`.
788    pub derived_since: Antichain<T>,
789
790    /// The read holds that this export has on its dependencies (its input and itself). When
791    /// the upper of the export changes, we downgrade this, which in turn
792    /// downgrades holds we have on our dependencies' sinces.
793    pub read_holds: [ReadHold<T>; 2],
794
795    /// The policy to use to downgrade `self.read_capability`.
796    pub read_policy: ReadPolicy<T>,
797
798    /// Reported write frontier.
799    pub write_frontier: Antichain<T>,
800}
801
802impl<T: Timestamp> ExportState<T> {
803    pub fn new(
804        cluster_id: StorageInstanceId,
805        read_hold: ReadHold<T>,
806        self_hold: ReadHold<T>,
807        write_frontier: Antichain<T>,
808        read_policy: ReadPolicy<T>,
809    ) -> Self
810    where
811        T: Lattice,
812    {
813        let mut dependency_since = Antichain::from_elem(T::minimum());
814        for read_hold in [&read_hold, &self_hold] {
815            dependency_since.join_assign(read_hold.since());
816        }
817        Self {
818            read_capabilities: MutableAntichain::from(dependency_since.borrow()),
819            cluster_id,
820            derived_since: dependency_since,
821            read_holds: [read_hold, self_hold],
822            read_policy,
823            write_frontier,
824        }
825    }
826
827    /// Returns the cluster to which the export is bound.
828    pub fn cluster_id(&self) -> StorageInstanceId {
829        self.cluster_id
830    }
831
832    /// Returns the cluster to which the export is bound.
833    pub fn input_hold(&self) -> &ReadHold<T> {
834        &self.read_holds[0]
835    }
836
837    /// Returns whether the export was dropped.
838    pub fn is_dropped(&self) -> bool {
839        self.read_holds.iter().all(|h| h.since().is_empty())
840    }
841}
842/// A channel that allows you to append a set of updates to a pre-defined [`GlobalId`].
843///
844/// See `CollectionManager::monotonic_appender` to acquire a [`MonotonicAppender`].
845#[derive(Clone, Debug)]
846pub struct MonotonicAppender<T> {
847    /// Channel that sends to a [`tokio::task`] which pushes updates to Persist.
848    tx: mpsc::UnboundedSender<(
849        Vec<AppendOnlyUpdate>,
850        oneshot::Sender<Result<(), StorageError<T>>>,
851    )>,
852}
853
854impl<T> MonotonicAppender<T> {
855    pub fn new(
856        tx: mpsc::UnboundedSender<(
857            Vec<AppendOnlyUpdate>,
858            oneshot::Sender<Result<(), StorageError<T>>>,
859        )>,
860    ) -> Self {
861        MonotonicAppender { tx }
862    }
863
864    pub async fn append(&self, updates: Vec<AppendOnlyUpdate>) -> Result<(), StorageError<T>> {
865        let (tx, rx) = oneshot::channel();
866
867        // Send our update to the CollectionManager.
868        self.tx
869            .send((updates, tx))
870            .map_err(|_| StorageError::ShuttingDown("collection manager"))?;
871
872        // Wait for a response, if we fail to receive then the CollectionManager has gone away.
873        let result = rx
874            .await
875            .map_err(|_| StorageError::ShuttingDown("collection manager"))?;
876
877        result
878    }
879}
880
881/// A wallclock lag measurement.
882///
883/// The enum representation reflects the fact that wallclock lag is undefined for unreadable
884/// collections, i.e. collections that contain no readable times.
885#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
886pub enum WallclockLag {
887    /// Lag value in seconds, for readable collections.
888    Seconds(u64),
889    /// Undefined lag, for unreadable collections.
890    Undefined,
891}
892
893impl WallclockLag {
894    /// The smallest possible wallclock lag measurement.
895    pub const MIN: Self = Self::Seconds(0);
896
897    /// Return the maximum of two lag values.
898    ///
899    /// We treat `Undefined` lags as greater than `Seconds`, to ensure we never report low lag
900    /// values when a collection was actually unreadable for some amount of time.
901    pub fn max(self, other: Self) -> Self {
902        match (self, other) {
903            (Self::Seconds(a), Self::Seconds(b)) => Self::Seconds(a.max(b)),
904            (Self::Undefined, _) | (_, Self::Undefined) => Self::Undefined,
905        }
906    }
907
908    /// Return the wrapped seconds value, or a default if the lag is `Undefined`.
909    pub fn unwrap_seconds_or(self, default: u64) -> u64 {
910        match self {
911            Self::Seconds(s) => s,
912            Self::Undefined => default,
913        }
914    }
915
916    /// Create a new `WallclockLag` by transforming the wrapped seconds value.
917    pub fn map_seconds(self, f: impl FnOnce(u64) -> u64) -> Self {
918        match self {
919            Self::Seconds(s) => Self::Seconds(f(s)),
920            Self::Undefined => Self::Undefined,
921        }
922    }
923
924    /// Convert this lag value into a [`Datum::Interval`] or [`Datum::Null`].
925    pub fn into_interval_datum(self) -> Datum<'static> {
926        match self {
927            Self::Seconds(secs) => {
928                let micros = i64::try_from(secs * 1_000_000).expect("must fit");
929                Datum::Interval(Interval::new(0, 0, micros))
930            }
931            Self::Undefined => Datum::Null,
932        }
933    }
934
935    /// Convert this lag value into a [`Datum::UInt64`] or [`Datum::Null`].
936    pub fn into_uint64_datum(self) -> Datum<'static> {
937        match self {
938            Self::Seconds(secs) => Datum::UInt64(secs),
939            Self::Undefined => Datum::Null,
940        }
941    }
942}
943
944/// The period covered by a wallclock lag histogram, represented as a `[start, end)` range.
945#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
946pub struct WallclockLagHistogramPeriod {
947    pub start: CheckedTimestamp<DateTime<Utc>>,
948    pub end: CheckedTimestamp<DateTime<Utc>>,
949}
950
951impl WallclockLagHistogramPeriod {
952    /// Construct a `WallclockLagHistogramPeriod` from the given epoch timestamp and dyncfg.
953    pub fn from_epoch_millis(epoch_ms: u64, dyncfg: &ConfigSet) -> Self {
954        let interval = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.get(dyncfg);
955        let interval_ms = u64::try_from(interval.as_millis()).unwrap_or_else(|_| {
956            soft_panic_or_log!("excessive wallclock lag histogram period interval: {interval:?}");
957            let default = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.default();
958            u64::try_from(default.as_millis()).unwrap()
959        });
960        let interval_ms = std::cmp::max(interval_ms, 1);
961
962        let start_ms = epoch_ms - (epoch_ms % interval_ms);
963        let start_dt = mz_ore::now::to_datetime(start_ms);
964        let start = start_dt.try_into().expect("must fit");
965
966        let end_ms = start_ms + interval_ms;
967        let end_dt = mz_ore::now::to_datetime(end_ms);
968        let end = end_dt.try_into().expect("must fit");
969
970        Self { start, end }
971    }
972}
973
974#[cfg(test)]
975mod tests {
976    use super::*;
977
978    #[mz_ore::test]
979    fn lag_writes_by_zero() {
980        let policy =
981            ReadPolicy::lag_writes_by(mz_repr::Timestamp::default(), mz_repr::Timestamp::default());
982        let write_frontier = Antichain::from_elem(mz_repr::Timestamp::from(5));
983        assert_eq!(policy.frontier(write_frontier.borrow()), write_frontier);
984    }
985}