mz_storage_client/controller.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller that provides an interface to the storage layer.
11//!
12//! The storage controller curates the creation of sources, the progress of readers through these collections,
13//! and their eventual dropping and resource reclamation.
14//!
15//! The storage controller can be viewed as a partial map from `GlobalId` to collection. It is an error to
16//! use an identifier before it has been "created" with `create_source()`. Once created, the controller holds
17//! a read capability for each source, which is manipulated with `update_read_capabilities()`.
18//! Eventually, the source is dropped with either `drop_sources()` or by allowing compaction to the
19//! empty frontier.
20
21use std::collections::{BTreeMap, BTreeSet};
22use std::fmt::Debug;
23use std::future::Future;
24use std::num::NonZeroI64;
25use std::pin::Pin;
26use std::sync::Arc;
27use std::time::Duration;
28
29use async_trait::async_trait;
30use chrono::{DateTime, Utc};
31use differential_dataflow::lattice::Lattice;
32use mz_cluster_client::ReplicaId;
33use mz_cluster_client::client::ClusterReplicaLocation;
34use mz_controller_types::dyncfgs::WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL;
35use mz_dyncfg::ConfigSet;
36use mz_ore::soft_panic_or_log;
37use mz_persist_client::batch::ProtoBatch;
38use mz_persist_types::{Codec64, Opaque, ShardId};
39use mz_repr::adt::interval::Interval;
40use mz_repr::adt::timestamp::CheckedTimestamp;
41use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row};
42use mz_storage_types::configuration::StorageConfiguration;
43use mz_storage_types::connections::inline::InlinedConnection;
44use mz_storage_types::controller::{CollectionMetadata, StorageError};
45use mz_storage_types::errors::CollectionMissing;
46use mz_storage_types::instances::StorageInstanceId;
47use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
48use mz_storage_types::parameters::StorageParameters;
49use mz_storage_types::read_holds::ReadHold;
50use mz_storage_types::read_policy::ReadPolicy;
51use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
52use mz_storage_types::sources::{
53 GenericSourceConnection, IngestionDescription, SourceDesc, SourceExportDataConfig,
54 SourceExportDetails, Timeline,
55};
56use serde::{Deserialize, Serialize};
57use timely::progress::Timestamp as TimelyTimestamp;
58use timely::progress::frontier::MutableAntichain;
59use timely::progress::{Antichain, Timestamp};
60use tokio::sync::{mpsc, oneshot};
61
62use crate::client::{AppendOnlyUpdate, StatusUpdate, TableData};
63use crate::statistics::WebhookStatistics;
64
65#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq, Hash, PartialOrd, Ord)]
66pub enum IntrospectionType {
67 /// We're not responsible for appending to this collection automatically, but we should
68 /// automatically bump the write frontier from time to time.
69 SinkStatusHistory,
70 SourceStatusHistory,
71 ShardMapping,
72
73 Frontiers,
74 ReplicaFrontiers,
75
76 ReplicaStatusHistory,
77 ReplicaMetricsHistory,
78 WallclockLagHistory,
79 WallclockLagHistogram,
80
81 // Note that this single-shard introspection source will be changed to per-replica,
82 // once we allow multiplexing multiple sources/sinks on a single cluster.
83 StorageSourceStatistics,
84 StorageSinkStatistics,
85
86 // The below are for statement logging.
87 StatementExecutionHistory,
88 SessionHistory,
89 PreparedStatementHistory,
90 SqlText,
91 // For statement lifecycle logging, which is closely related
92 // to statement logging
93 StatementLifecycleHistory,
94
95 // Collections written by the compute controller.
96 ComputeDependencies,
97 ComputeOperatorHydrationStatus,
98 ComputeMaterializedViewRefreshes,
99 ComputeErrorCounts,
100 ComputeHydrationTimes,
101
102 // Written by the Adapter for tracking AWS PrivateLink Connection Status History
103 PrivatelinkConnectionStatusHistory,
104}
105
106/// Describes how data is written to the collection.
107#[derive(Clone, Debug, Eq, PartialEq)]
108pub enum DataSource<T> {
109 /// Ingest data from some external source.
110 Ingestion(IngestionDescription),
111 /// This source receives its data from the identified ingestion,
112 /// from an external object identified using `SourceExportDetails`.
113 ///
114 /// The referenced ingestion must be created before all of its exports.
115 IngestionExport {
116 ingestion_id: GlobalId,
117 details: SourceExportDetails,
118 data_config: SourceExportDataConfig,
119 },
120 /// Data comes from introspection sources, which the controller itself is
121 /// responsible for generating.
122 Introspection(IntrospectionType),
123 /// Data comes from the source's remapping/reclock operator.
124 Progress,
125 /// Data comes from external HTTP requests pushed to Materialize.
126 Webhook,
127 /// The adapter layer appends timestamped data, i.e. it is a `TABLE`.
128 Table,
129 /// This source's data does not need to be managed by the storage
130 /// controller, e.g. it's a materialized view or the catalog collection.
131 Other,
132 /// This collection is the output collection of a sink.
133 Sink { desc: ExportDescription<T> },
134}
135
136/// Describes a request to create a source.
137#[derive(Clone, Debug, Eq, PartialEq)]
138pub struct CollectionDescription<T> {
139 /// The schema of this collection
140 pub desc: RelationDesc,
141 /// The source of this collection's data.
142 pub data_source: DataSource<T>,
143 /// An optional frontier to which the collection's `since` should be advanced.
144 pub since: Option<Antichain<T>>,
145 /// The timeline of the source. Absent for materialized views, continual tasks, etc.
146 pub timeline: Option<Timeline>,
147 /// The primary of this collections.
148 ///
149 /// Multiple storage collections can point to the same persist shard,
150 /// possibly with different schemas. In such a configuration, we select one
151 /// of the involved collections as the primary, who "owns" the persist
152 /// shard. All other involved collections have a dependency on the primary.
153 pub primary: Option<GlobalId>,
154}
155
156impl<T> CollectionDescription<T> {
157 /// Create a CollectionDescription for [`DataSource::Other`].
158 pub fn for_other(desc: RelationDesc, since: Option<Antichain<T>>) -> Self {
159 Self {
160 desc,
161 data_source: DataSource::Other,
162 since,
163 timeline: None,
164 primary: None,
165 }
166 }
167
168 /// Create a CollectionDescription for a table.
169 pub fn for_table(desc: RelationDesc) -> Self {
170 Self {
171 desc,
172 data_source: DataSource::Table,
173 since: None,
174 timeline: Some(Timeline::EpochMilliseconds),
175 primary: None,
176 }
177 }
178}
179
180#[derive(Clone, Debug, Eq, PartialEq)]
181pub struct ExportDescription<T = mz_repr::Timestamp> {
182 pub sink: StorageSinkDesc<(), T>,
183 /// The ID of the instance in which to install the export.
184 pub instance_id: StorageInstanceId,
185}
186
187#[derive(Debug)]
188pub enum Response<T> {
189 FrontierUpdates(Vec<(GlobalId, Antichain<T>)>),
190}
191
192/// Metadata that the storage controller must know to properly handle the life
193/// cycle of creating and dropping collections.
194///
195/// This data should be kept consistent with the state modified using
196/// [`StorageTxn`].
197///
198/// n.b. the "txn WAL shard" is also metadata that's persisted, but if we
199/// included it in this struct it would never be read.
200#[derive(Debug, Clone, Serialize, Default)]
201pub struct StorageMetadata {
202 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
203 pub collection_metadata: BTreeMap<GlobalId, ShardId>,
204 pub unfinalized_shards: BTreeSet<ShardId>,
205}
206
207impl StorageMetadata {
208 pub fn get_collection_shard<T>(&self, id: GlobalId) -> Result<ShardId, StorageError<T>> {
209 let shard_id = self
210 .collection_metadata
211 .get(&id)
212 .ok_or(StorageError::IdentifierMissing(id))?;
213
214 Ok(*shard_id)
215 }
216}
217
218/// Provides an interface for the storage controller to read and write data that
219/// is recorded elsewhere.
220///
221/// Data written to the implementor of this trait should make a consistent view
222/// of the data available through [`StorageMetadata`].
223#[async_trait]
224pub trait StorageTxn<T> {
225 /// Retrieve all of the visible storage metadata.
226 ///
227 /// The value of this map should be treated as opaque.
228 fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId>;
229
230 /// Add new storage metadata for a collection.
231 ///
232 /// Subsequent calls to [`StorageTxn::get_collection_metadata`] must include
233 /// this data.
234 fn insert_collection_metadata(
235 &mut self,
236 s: BTreeMap<GlobalId, ShardId>,
237 ) -> Result<(), StorageError<T>>;
238
239 /// Remove the metadata associated with the identified collections.
240 ///
241 /// Subsequent calls to [`StorageTxn::get_collection_metadata`] must not
242 /// include these keys.
243 fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)>;
244
245 /// Retrieve all of the shards that are no longer in use by an active
246 /// collection but are yet to be finalized.
247 fn get_unfinalized_shards(&self) -> BTreeSet<ShardId>;
248
249 /// Insert the specified values as unfinalized shards.
250 fn insert_unfinalized_shards(&mut self, s: BTreeSet<ShardId>) -> Result<(), StorageError<T>>;
251
252 /// Mark the specified shards as finalized, deleting them from the
253 /// unfinalized shard collection.
254 fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>);
255
256 /// Get the txn WAL shard for this environment if it exists.
257 fn get_txn_wal_shard(&self) -> Option<ShardId>;
258
259 /// Store the specified shard as the environment's txn WAL shard.
260 ///
261 /// The implementor should error if the shard is already specified.
262 fn write_txn_wal_shard(&mut self, shard: ShardId) -> Result<(), StorageError<T>>;
263}
264
265pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
266
267/// A predicate for a `Row` filter.
268pub type RowPredicate = Box<dyn Fn(&Row) -> bool + Send + Sync>;
269
270/// High-level write operations applicable to storage collections.
271pub enum StorageWriteOp {
272 /// Append a set of rows with specified multiplicities.
273 ///
274 /// The multiplicities may be negative, so an `Append` operation can perform
275 /// both insertions and retractions.
276 Append { updates: Vec<(Row, Diff)> },
277 /// Delete all rows matching the given predicate.
278 Delete { filter: RowPredicate },
279}
280
281impl StorageWriteOp {
282 /// Returns whether this operation appends an empty set of updates.
283 pub fn is_empty_append(&self) -> bool {
284 match self {
285 Self::Append { updates } => updates.is_empty(),
286 Self::Delete { .. } => false,
287 }
288 }
289}
290
291#[async_trait(?Send)]
292pub trait StorageController: Debug {
293 type Timestamp: TimelyTimestamp;
294
295 /// Marks the end of any initialization commands.
296 ///
297 /// The implementor may wait for this method to be called before implementing prior commands,
298 /// and so it is important for a user to invoke this method as soon as it is comfortable.
299 /// This method can be invoked immediately, at the potential expense of performance.
300 fn initialization_complete(&mut self);
301
302 /// Update storage configuration with new parameters.
303 fn update_parameters(&mut self, config_params: StorageParameters);
304
305 /// Get the current configuration, including parameters updated with `update_parameters`.
306 fn config(&self) -> &StorageConfiguration;
307
308 /// Returns the [CollectionMetadata] of the collection identified by `id`.
309 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing>;
310
311 /// Returns `true` iff the given collection/ingestion has been hydrated.
312 ///
313 /// For this check, zero-replica clusters are always considered hydrated.
314 /// Their collections would never normally be considered hydrated but it's
315 /// clearly intentional that they have no replicas.
316 fn collection_hydrated(
317 &self,
318 collection_id: GlobalId,
319 ) -> Result<bool, StorageError<Self::Timestamp>>;
320
321 /// Returns `true` if each non-transient, non-excluded collection is
322 /// hydrated on at least one of the provided replicas.
323 ///
324 /// If no replicas are provided, this checks for hydration on _any_ replica.
325 ///
326 /// This also returns `true` in case this cluster does not have any
327 /// replicas.
328 fn collections_hydrated_on_replicas(
329 &self,
330 target_replica_ids: Option<Vec<ReplicaId>>,
331 target_cluster_ids: &StorageInstanceId,
332 exclude_collections: &BTreeSet<GlobalId>,
333 ) -> Result<bool, StorageError<Self::Timestamp>>;
334
335 /// Returns the since/upper frontiers of the identified collection.
336 fn collection_frontiers(
337 &self,
338 id: GlobalId,
339 ) -> Result<(Antichain<Self::Timestamp>, Antichain<Self::Timestamp>), CollectionMissing>;
340
341 /// Returns the since/upper frontiers of the identified collections.
342 ///
343 /// Having a method that returns both frontiers at the same time, for all
344 /// requested collections, ensures that we can get a consistent "snapshot"
345 /// of collection state. If we had separate methods instead, and/or would
346 /// allow getting frontiers for collections one at a time, it could happen
347 /// that collection state changes concurrently, while information is
348 /// gathered.
349 fn collections_frontiers(
350 &self,
351 id: Vec<GlobalId>,
352 ) -> Result<
353 Vec<(
354 GlobalId,
355 Antichain<Self::Timestamp>,
356 Antichain<Self::Timestamp>,
357 )>,
358 CollectionMissing,
359 >;
360
361 /// Acquire an iterator over [CollectionMetadata] for all active
362 /// collections.
363 ///
364 /// A collection is "active" when it has a non-empty frontier of read
365 /// capabilities.
366 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
367
368 /// Returns the IDs of ingestion exports running on the given instance. This
369 /// includes the ingestion itself, if any, and running source tables (aka.
370 /// subsources).
371 fn active_ingestion_exports(
372 &self,
373 instance_id: StorageInstanceId,
374 ) -> Box<dyn Iterator<Item = &GlobalId> + '_>;
375
376 /// Checks whether a collection exists under the given `GlobalId`. Returns
377 /// an error if the collection does not exist.
378 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
379
380 /// Creates a storage instance with the specified ID.
381 ///
382 /// A storage instance can have zero or one replicas. The instance is
383 /// created with zero replicas.
384 ///
385 /// Panics if a storage instance with the given ID already exists.
386 fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>);
387
388 /// Drops the storage instance with the given ID.
389 ///
390 /// If you call this method while the storage instance has a replica
391 /// attached, that replica will be leaked. Call `drop_replica` first.
392 ///
393 /// Panics if a storage instance with the given ID does not exist.
394 fn drop_instance(&mut self, id: StorageInstanceId);
395
396 /// Updates a storage instance's workload class.
397 fn update_instance_workload_class(
398 &mut self,
399 id: StorageInstanceId,
400 workload_class: Option<String>,
401 );
402
403 /// Connects the storage instance to the specified replica.
404 ///
405 /// If the storage instance is already attached to a replica, communication
406 /// with that replica is severed in favor of the new replica.
407 ///
408 /// In the future, this API will be adjusted to support active replication
409 /// of storage instances (i.e., multiple replicas attached to a given
410 /// storage instance).
411 fn connect_replica(
412 &mut self,
413 instance_id: StorageInstanceId,
414 replica_id: ReplicaId,
415 location: ClusterReplicaLocation,
416 );
417
418 /// Disconnects the storage instance from the specified replica.
419 fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId);
420
421 /// Across versions of Materialize the nullability of columns for some objects can change based
422 /// on updates to our optimizer.
423 ///
424 /// During bootstrap we will register these new schemas with Persist.
425 ///
426 /// See: <https://github.com/MaterializeInc/database-issues/issues/2488>
427 async fn evolve_nullability_for_bootstrap(
428 &mut self,
429 storage_metadata: &StorageMetadata,
430 collections: Vec<(GlobalId, RelationDesc)>,
431 ) -> Result<(), StorageError<Self::Timestamp>>;
432
433 /// Create the sources described in the individual RunIngestionCommand commands.
434 ///
435 /// Each command carries the source id, the source description, and any associated metadata
436 /// needed to ingest the particular source.
437 ///
438 /// This command installs collection state for the indicated sources, and they are
439 /// now valid to use in queries at times beyond the initial `since` frontiers. Each
440 /// collection also acquires a read capability at this frontier, which will need to
441 /// be repeatedly downgraded with `allow_compaction()` to permit compaction.
442 ///
443 /// This method is NOT idempotent; It can fail between processing of different
444 /// collections and leave the controller in an inconsistent state. It is almost
445 /// always wrong to do anything but abort the process on `Err`.
446 ///
447 /// The `register_ts` is used as the initial timestamp that tables are available for reads. (We
448 /// might later give non-tables the same treatment, but hold off on that initially.) Callers
449 /// must provide a Some if any of the collections is a table. A None may be given if none of the
450 /// collections are a table (i.e. all materialized views, sources, etc).
451 async fn create_collections(
452 &mut self,
453 storage_metadata: &StorageMetadata,
454 register_ts: Option<Self::Timestamp>,
455 collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
456 ) -> Result<(), StorageError<Self::Timestamp>> {
457 self.create_collections_for_bootstrap(
458 storage_metadata,
459 register_ts,
460 collections,
461 &BTreeSet::new(),
462 )
463 .await
464 }
465
466 /// Like [`Self::create_collections`], except used specifically for bootstrap.
467 ///
468 /// `migrated_storage_collections` is a set of migrated storage collections to be excluded
469 /// from the txn-wal sub-system.
470 async fn create_collections_for_bootstrap(
471 &mut self,
472 storage_metadata: &StorageMetadata,
473 register_ts: Option<Self::Timestamp>,
474 collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
475 migrated_storage_collections: &BTreeSet<GlobalId>,
476 ) -> Result<(), StorageError<Self::Timestamp>>;
477
478 /// Check that the ingestion associated with `id` can use the provided
479 /// [`SourceDesc`].
480 ///
481 /// Note that this check is optimistic and its return of `Ok(())` does not
482 /// guarantee that subsequent calls to `alter_ingestion_source_desc` are
483 /// guaranteed to succeed.
484 fn check_alter_ingestion_source_desc(
485 &mut self,
486 ingestion_id: GlobalId,
487 source_desc: &SourceDesc,
488 ) -> Result<(), StorageError<Self::Timestamp>>;
489
490 /// Alters each identified collection to use the correlated [`GenericSourceConnection`].
491 async fn alter_ingestion_connections(
492 &mut self,
493 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
494 ) -> Result<(), StorageError<Self::Timestamp>>;
495
496 /// Alters the data config for the specified source exports of the specified ingestions.
497 async fn alter_ingestion_export_data_configs(
498 &mut self,
499 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
500 ) -> Result<(), StorageError<Self::Timestamp>>;
501
502 async fn alter_table_desc(
503 &mut self,
504 existing_collection: GlobalId,
505 new_collection: GlobalId,
506 new_desc: RelationDesc,
507 expected_version: RelationVersion,
508 register_ts: Self::Timestamp,
509 ) -> Result<(), StorageError<Self::Timestamp>>;
510
511 /// Acquire an immutable reference to the export state, should it exist.
512 fn export(
513 &self,
514 id: GlobalId,
515 ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>;
516
517 /// Acquire a mutable reference to the export state, should it exist.
518 fn export_mut(
519 &mut self,
520 id: GlobalId,
521 ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>;
522
523 /// Create a oneshot ingestion.
524 async fn create_oneshot_ingestion(
525 &mut self,
526 ingestion_id: uuid::Uuid,
527 collection_id: GlobalId,
528 instance_id: StorageInstanceId,
529 request: OneshotIngestionRequest,
530 result_tx: OneshotResultCallback<ProtoBatch>,
531 ) -> Result<(), StorageError<Self::Timestamp>>;
532
533 /// Cancel a oneshot ingestion.
534 fn cancel_oneshot_ingestion(
535 &mut self,
536 ingestion_id: uuid::Uuid,
537 ) -> Result<(), StorageError<Self::Timestamp>>;
538
539 /// Alter the sink identified by the given id to match the provided `ExportDescription`.
540 async fn alter_export(
541 &mut self,
542 id: GlobalId,
543 export: ExportDescription<Self::Timestamp>,
544 ) -> Result<(), StorageError<Self::Timestamp>>;
545
546 /// For each identified export, alter its [`StorageSinkConnection`].
547 async fn alter_export_connections(
548 &mut self,
549 exports: BTreeMap<GlobalId, StorageSinkConnection>,
550 ) -> Result<(), StorageError<Self::Timestamp>>;
551
552 /// Drops the read capability for the tables and allows their resources to be reclaimed.
553 fn drop_tables(
554 &mut self,
555 storage_metadata: &StorageMetadata,
556 identifiers: Vec<GlobalId>,
557 ts: Self::Timestamp,
558 ) -> Result<(), StorageError<Self::Timestamp>>;
559
560 /// Drops the read capability for the sources and allows their resources to be reclaimed.
561 fn drop_sources(
562 &mut self,
563 storage_metadata: &StorageMetadata,
564 identifiers: Vec<GlobalId>,
565 ) -> Result<(), StorageError<Self::Timestamp>>;
566
567 /// Drops the read capability for the sinks and allows their resources to be reclaimed.
568 fn drop_sinks(
569 &mut self,
570 storage_metadata: &StorageMetadata,
571 identifiers: Vec<GlobalId>,
572 ) -> Result<(), StorageError<Self::Timestamp>>;
573
574 /// Drops the read capability for the sinks and allows their resources to be reclaimed.
575 ///
576 /// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
577 /// controller starts/restarts it has no durable state. That means that it has no way of
578 /// remembering any past commands sent. In the future we plan on persisting state for the
579 /// controller so that it is aware of past commands.
580 /// Therefore this method is for dropping sinks that we know to have been previously
581 /// created, but have been forgotten by the controller due to a restart.
582 /// Once command history becomes durable we can remove this method and use the normal
583 /// `drop_sinks`.
584 fn drop_sinks_unvalidated(
585 &mut self,
586 storage_metadata: &StorageMetadata,
587 identifiers: Vec<GlobalId>,
588 );
589
590 /// Drops the read capability for the sources and allows their resources to be reclaimed.
591 ///
592 /// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
593 /// controller starts/restarts it has no durable state. That means that it has no way of
594 /// remembering any past commands sent. In the future we plan on persisting state for the
595 /// controller so that it is aware of past commands.
596 /// Therefore this method is for dropping sources that we know to have been previously
597 /// created, but have been forgotten by the controller due to a restart.
598 /// Once command history becomes durable we can remove this method and use the normal
599 /// `drop_sources`.
600 fn drop_sources_unvalidated(
601 &mut self,
602 storage_metadata: &StorageMetadata,
603 identifiers: Vec<GlobalId>,
604 ) -> Result<(), StorageError<Self::Timestamp>>;
605
606 /// Append `updates` into the local input named `id` and advance its upper to `upper`.
607 ///
608 /// The method returns a oneshot that can be awaited to indicate completion of the write.
609 /// The method may return an error, indicating an immediately visible error, and also the
610 /// oneshot may return an error if one is encountered during the write.
611 ///
612 /// All updates in `commands` are applied atomically.
613 // TODO(petrosagg): switch upper to `Antichain<Timestamp>`
614 fn append_table(
615 &mut self,
616 write_ts: Self::Timestamp,
617 advance_to: Self::Timestamp,
618 commands: Vec<(GlobalId, Vec<TableData>)>,
619 ) -> Result<
620 tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
621 StorageError<Self::Timestamp>,
622 >;
623
624 /// Returns a [`MonotonicAppender`] which is a channel that can be used to monotonically
625 /// append to the specified [`GlobalId`].
626 fn monotonic_appender(
627 &self,
628 id: GlobalId,
629 ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>>;
630
631 /// Returns a shared [`WebhookStatistics`] which can be used to report user-facing
632 /// statistics for this given webhhook, specified by the [`GlobalId`].
633 ///
634 // This is used to support a fairly special case, where a source needs to report statistics
635 // from outside the ordinary controller-clusterd path. Its possible to merge this with
636 // `monotonic_appender`, whose only current user is webhooks, but given that they will
637 // likely be moved to clusterd, we just leave this a special case.
638 fn webhook_statistics(
639 &self,
640 id: GlobalId,
641 ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>>;
642
643 /// Waits until the controller is ready to process a response.
644 ///
645 /// This method may block for an arbitrarily long time.
646 ///
647 /// When the method returns, the owner should call
648 /// [`StorageController::process`] to process the ready message.
649 ///
650 /// This method is cancellation safe.
651 async fn ready(&mut self);
652
653 /// Processes the work queued by [`StorageController::ready`].
654 fn process(
655 &mut self,
656 storage_metadata: &StorageMetadata,
657 ) -> Result<Option<Response<Self::Timestamp>>, anyhow::Error>;
658
659 /// Exposes the internal state of the data shard for debugging and QA.
660 ///
661 /// We'll be thoughtful about making unnecessary changes, but the **output
662 /// of this method needs to be gated from users**, so that it's not subject
663 /// to our backward compatibility guarantees.
664 ///
665 /// TODO: Ideally this would return `impl Serialize` so the caller can do
666 /// with it what they like, but that doesn't work in traits yet. The
667 /// workaround (an associated type) doesn't work because persist doesn't
668 /// want to make the type public. In the meantime, move the `serde_json`
669 /// call from the single user into this method.
670 async fn inspect_persist_state(&self, id: GlobalId)
671 -> Result<serde_json::Value, anyhow::Error>;
672
673 /// Records append-only updates for the given introspection type.
674 ///
675 /// Rows passed in `updates` MUST have the correct schema for the given
676 /// introspection type, as readers rely on this and might panic otherwise.
677 fn append_introspection_updates(&mut self, type_: IntrospectionType, updates: Vec<(Row, Diff)>);
678
679 /// Records append-only status updates for the given introspection type.
680 fn append_status_introspection_updates(
681 &mut self,
682 type_: IntrospectionType,
683 updates: Vec<StatusUpdate>,
684 );
685
686 /// Updates the desired state of the given introspection type.
687 ///
688 /// Rows passed in `op` MUST have the correct schema for the given
689 /// introspection type, as readers rely on this and might panic otherwise.
690 fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp);
691
692 /// Returns a sender for updates to the specified append-only introspection collection.
693 ///
694 /// # Panics
695 ///
696 /// Panics if the given introspection type is not associated with an append-only collection.
697 fn append_only_introspection_tx(
698 &self,
699 type_: IntrospectionType,
700 ) -> mpsc::UnboundedSender<(
701 Vec<AppendOnlyUpdate>,
702 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
703 )>;
704
705 /// Returns a sender for updates to the specified differential introspection collection.
706 ///
707 /// # Panics
708 ///
709 /// Panics if the given introspection type is not associated with a differential collection.
710 fn differential_introspection_tx(
711 &self,
712 type_: IntrospectionType,
713 ) -> mpsc::UnboundedSender<(
714 StorageWriteOp,
715 oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
716 )>;
717
718 async fn real_time_recent_timestamp(
719 &self,
720 source_ids: BTreeSet<GlobalId>,
721 timeout: Duration,
722 ) -> Result<
723 BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
724 StorageError<Self::Timestamp>,
725 >;
726
727 /// Returns the state of the [`StorageController`] formatted as JSON.
728 fn dump(&self) -> Result<serde_json::Value, anyhow::Error>;
729}
730
731impl<T> DataSource<T> {
732 /// Returns true if the storage controller manages the data shard for this
733 /// source using txn-wal.
734 pub fn in_txns(&self) -> bool {
735 match self {
736 DataSource::Table => true,
737 DataSource::Other
738 | DataSource::Ingestion(_)
739 | DataSource::IngestionExport { .. }
740 | DataSource::Introspection(_)
741 | DataSource::Progress
742 | DataSource::Webhook => false,
743 DataSource::Sink { .. } => false,
744 }
745 }
746}
747
748/// A wrapper struct that presents the adapter token to a format that is understandable by persist
749/// and also allows us to differentiate between a token being present versus being set for the
750/// first time.
751#[derive(PartialEq, Clone, Debug)]
752pub struct PersistEpoch(pub Option<NonZeroI64>);
753
754impl Opaque for PersistEpoch {
755 fn initial() -> Self {
756 PersistEpoch(None)
757 }
758}
759
760impl Codec64 for PersistEpoch {
761 fn codec_name() -> String {
762 "PersistEpoch".to_owned()
763 }
764
765 fn encode(&self) -> [u8; 8] {
766 self.0.map(NonZeroI64::get).unwrap_or(0).to_le_bytes()
767 }
768
769 fn decode(buf: [u8; 8]) -> Self {
770 Self(NonZeroI64::new(i64::from_le_bytes(buf)))
771 }
772}
773
774impl From<NonZeroI64> for PersistEpoch {
775 fn from(epoch: NonZeroI64) -> Self {
776 Self(Some(epoch))
777 }
778}
779
780/// State maintained about individual exports.
781#[derive(Debug)]
782pub struct ExportState<T: TimelyTimestamp> {
783 /// Really only for keeping track of changes to the `derived_since`.
784 pub read_capabilities: MutableAntichain<T>,
785
786 /// The cluster this export is associated with.
787 pub cluster_id: StorageInstanceId,
788
789 /// The current since frontier, derived from `write_frontier` using
790 /// `hold_policy`.
791 pub derived_since: Antichain<T>,
792
793 /// The read holds that this export has on its dependencies (its input and itself). When
794 /// the upper of the export changes, we downgrade this, which in turn
795 /// downgrades holds we have on our dependencies' sinces.
796 pub read_holds: [ReadHold<T>; 2],
797
798 /// The policy to use to downgrade `self.read_capability`.
799 pub read_policy: ReadPolicy<T>,
800
801 /// Reported write frontier.
802 pub write_frontier: Antichain<T>,
803}
804
805impl<T: Timestamp> ExportState<T> {
806 pub fn new(
807 cluster_id: StorageInstanceId,
808 read_hold: ReadHold<T>,
809 self_hold: ReadHold<T>,
810 write_frontier: Antichain<T>,
811 read_policy: ReadPolicy<T>,
812 ) -> Self
813 where
814 T: Lattice,
815 {
816 let mut dependency_since = Antichain::from_elem(T::minimum());
817 for read_hold in [&read_hold, &self_hold] {
818 dependency_since.join_assign(read_hold.since());
819 }
820 Self {
821 read_capabilities: MutableAntichain::from(dependency_since.borrow()),
822 cluster_id,
823 derived_since: dependency_since,
824 read_holds: [read_hold, self_hold],
825 read_policy,
826 write_frontier,
827 }
828 }
829
830 /// Returns the cluster to which the export is bound.
831 pub fn cluster_id(&self) -> StorageInstanceId {
832 self.cluster_id
833 }
834
835 /// Returns the cluster to which the export is bound.
836 pub fn input_hold(&self) -> &ReadHold<T> {
837 &self.read_holds[0]
838 }
839
840 /// Returns whether the export was dropped.
841 pub fn is_dropped(&self) -> bool {
842 self.read_holds.iter().all(|h| h.since().is_empty())
843 }
844}
845/// A channel that allows you to append a set of updates to a pre-defined [`GlobalId`].
846///
847/// See `CollectionManager::monotonic_appender` to acquire a [`MonotonicAppender`].
848#[derive(Clone, Debug)]
849pub struct MonotonicAppender<T> {
850 /// Channel that sends to a [`tokio::task`] which pushes updates to Persist.
851 tx: mpsc::UnboundedSender<(
852 Vec<AppendOnlyUpdate>,
853 oneshot::Sender<Result<(), StorageError<T>>>,
854 )>,
855}
856
857impl<T> MonotonicAppender<T> {
858 pub fn new(
859 tx: mpsc::UnboundedSender<(
860 Vec<AppendOnlyUpdate>,
861 oneshot::Sender<Result<(), StorageError<T>>>,
862 )>,
863 ) -> Self {
864 MonotonicAppender { tx }
865 }
866
867 pub async fn append(&self, updates: Vec<AppendOnlyUpdate>) -> Result<(), StorageError<T>> {
868 let (tx, rx) = oneshot::channel();
869
870 // Send our update to the CollectionManager.
871 self.tx
872 .send((updates, tx))
873 .map_err(|_| StorageError::ShuttingDown("collection manager"))?;
874
875 // Wait for a response, if we fail to receive then the CollectionManager has gone away.
876 let result = rx
877 .await
878 .map_err(|_| StorageError::ShuttingDown("collection manager"))?;
879
880 result
881 }
882}
883
884/// A wallclock lag measurement.
885///
886/// The enum representation reflects the fact that wallclock lag is undefined for unreadable
887/// collections, i.e. collections that contain no readable times.
888#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
889pub enum WallclockLag {
890 /// Lag value in seconds, for readable collections.
891 Seconds(u64),
892 /// Undefined lag, for unreadable collections.
893 Undefined,
894}
895
896impl WallclockLag {
897 /// The smallest possible wallclock lag measurement.
898 pub const MIN: Self = Self::Seconds(0);
899
900 /// Return the maximum of two lag values.
901 ///
902 /// We treat `Undefined` lags as greater than `Seconds`, to ensure we never report low lag
903 /// values when a collection was actually unreadable for some amount of time.
904 pub fn max(self, other: Self) -> Self {
905 match (self, other) {
906 (Self::Seconds(a), Self::Seconds(b)) => Self::Seconds(a.max(b)),
907 (Self::Undefined, _) | (_, Self::Undefined) => Self::Undefined,
908 }
909 }
910
911 /// Return the wrapped seconds value, or a default if the lag is `Undefined`.
912 pub fn unwrap_seconds_or(self, default: u64) -> u64 {
913 match self {
914 Self::Seconds(s) => s,
915 Self::Undefined => default,
916 }
917 }
918
919 /// Create a new `WallclockLag` by transforming the wrapped seconds value.
920 pub fn map_seconds(self, f: impl FnOnce(u64) -> u64) -> Self {
921 match self {
922 Self::Seconds(s) => Self::Seconds(f(s)),
923 Self::Undefined => Self::Undefined,
924 }
925 }
926
927 /// Convert this lag value into a [`Datum::Interval`] or [`Datum::Null`].
928 pub fn into_interval_datum(self) -> Datum<'static> {
929 match self {
930 Self::Seconds(secs) => {
931 let micros = i64::try_from(secs * 1_000_000).expect("must fit");
932 Datum::Interval(Interval::new(0, 0, micros))
933 }
934 Self::Undefined => Datum::Null,
935 }
936 }
937
938 /// Convert this lag value into a [`Datum::UInt64`] or [`Datum::Null`].
939 pub fn into_uint64_datum(self) -> Datum<'static> {
940 match self {
941 Self::Seconds(secs) => Datum::UInt64(secs),
942 Self::Undefined => Datum::Null,
943 }
944 }
945}
946
947/// The period covered by a wallclock lag histogram, represented as a `[start, end)` range.
948#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
949pub struct WallclockLagHistogramPeriod {
950 pub start: CheckedTimestamp<DateTime<Utc>>,
951 pub end: CheckedTimestamp<DateTime<Utc>>,
952}
953
954impl WallclockLagHistogramPeriod {
955 /// Construct a `WallclockLagHistogramPeriod` from the given epoch timestamp and dyncfg.
956 pub fn from_epoch_millis(epoch_ms: u64, dyncfg: &ConfigSet) -> Self {
957 let interval = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.get(dyncfg);
958 let interval_ms = u64::try_from(interval.as_millis()).unwrap_or_else(|_| {
959 soft_panic_or_log!("excessive wallclock lag histogram period interval: {interval:?}");
960 let default = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.default();
961 u64::try_from(default.as_millis()).unwrap()
962 });
963 let interval_ms = std::cmp::max(interval_ms, 1);
964
965 let start_ms = epoch_ms - (epoch_ms % interval_ms);
966 let start_dt = mz_ore::now::to_datetime(start_ms);
967 let start = start_dt.try_into().expect("must fit");
968
969 let end_ms = start_ms + interval_ms;
970 let end_dt = mz_ore::now::to_datetime(end_ms);
971 let end = end_dt.try_into().expect("must fit");
972
973 Self { start, end }
974 }
975}
976
977#[cfg(test)]
978mod tests {
979 use super::*;
980
981 #[mz_ore::test]
982 fn lag_writes_by_zero() {
983 let policy =
984 ReadPolicy::lag_writes_by(mz_repr::Timestamp::default(), mz_repr::Timestamp::default());
985 let write_frontier = Antichain::from_elem(mz_repr::Timestamp::from(5));
986 assert_eq!(policy.frontier(write_frontier.borrow()), write_frontier);
987 }
988}