1#![allow(missing_docs)]
11#![allow(clippy::as_conversions, clippy::clone_on_ref_ptr)]
14
15use std::collections::{BTreeMap, BTreeSet};
18use std::fmt::Debug;
19use std::iter;
20
21use async_trait::async_trait;
22use differential_dataflow::difference::Semigroup;
23use differential_dataflow::lattice::Lattice;
24use mz_cluster_client::ReplicaId;
25use mz_cluster_client::client::TryIntoProtocolNonce;
26use mz_ore::assert_none;
27use mz_persist_client::batch::{BatchBuilder, ProtoBatch};
28use mz_persist_client::write::WriteHandle;
29use mz_persist_types::{Codec, Codec64, StepForward};
30use mz_repr::{Diff, GlobalId, Row, TimestampManipulation};
31use mz_service::client::{GenericClient, Partitionable, PartitionedState};
32use mz_storage_types::controller::CollectionMetadata;
33use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
34use mz_storage_types::parameters::StorageParameters;
35use mz_storage_types::sinks::StorageSinkDesc;
36use mz_storage_types::sources::IngestionDescription;
37use serde::{Deserialize, Serialize};
38use smallvec::SmallVec;
39use timely::PartialOrder;
40use timely::progress::Timestamp;
41use timely::progress::frontier::{Antichain, MutableAntichain};
42use uuid::Uuid;
43
44use crate::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate};
45
46pub trait StorageClient<T = mz_repr::Timestamp>:
48 GenericClient<StorageCommand<T>, StorageResponse<T>>
49{
50}
51
52impl<C, T> StorageClient<T> for C where C: GenericClient<StorageCommand<T>, StorageResponse<T>> {}
53
54#[async_trait]
55impl<T: Send> GenericClient<StorageCommand<T>, StorageResponse<T>> for Box<dyn StorageClient<T>> {
56 async fn send(&mut self, cmd: StorageCommand<T>) -> Result<(), anyhow::Error> {
57 (**self).send(cmd).await
58 }
59
60 async fn recv(&mut self) -> Result<Option<StorageResponse<T>>, anyhow::Error> {
66 (**self).recv().await
68 }
69}
70
71#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
73pub enum StorageCommand<T = mz_repr::Timestamp> {
74 Hello {
76 nonce: Uuid,
77 },
78 InitializationComplete,
81 AllowWrites,
88 UpdateConfiguration(Box<StorageParameters>),
90 RunIngestion(Box<RunIngestionCommand>),
92 AllowCompaction(GlobalId, Antichain<T>),
96 RunSink(Box<RunSinkCommand<T>>),
97 RunOneshotIngestion(Box<RunOneshotIngestion>),
103 CancelOneshotIngestion(Uuid),
112}
113
114impl<T> StorageCommand<T> {
115 pub fn installs_objects(&self) -> bool {
117 use StorageCommand::*;
118 match self {
119 Hello { .. }
120 | InitializationComplete
121 | AllowWrites
122 | UpdateConfiguration(_)
123 | AllowCompaction(_, _)
124 | CancelOneshotIngestion { .. } => false,
125 RunIngestion(_) | RunSink(_) | RunOneshotIngestion(_) => true,
129 }
130 }
131}
132
133#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
135pub struct RunIngestionCommand {
136 pub id: GlobalId,
138 pub description: IngestionDescription<CollectionMetadata>,
141}
142
143#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
145pub struct RunOneshotIngestion {
146 pub ingestion_id: uuid::Uuid,
148 pub collection_id: GlobalId,
150 pub collection_meta: CollectionMetadata,
152 pub request: OneshotIngestionRequest,
154}
155
156#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
158pub struct RunSinkCommand<T> {
159 pub id: GlobalId,
160 pub description: StorageSinkDesc<CollectionMetadata, T>,
161}
162
163#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
165pub enum Status {
166 Starting,
167 Running,
168 Paused,
169 Stalled,
170 Ceased,
173 Dropped,
174}
175
176impl std::str::FromStr for Status {
177 type Err = anyhow::Error;
178 fn from_str(s: &str) -> Result<Self, Self::Err> {
180 Ok(match s {
181 "starting" => Status::Starting,
182 "running" => Status::Running,
183 "paused" => Status::Paused,
184 "stalled" => Status::Stalled,
185 "ceased" => Status::Ceased,
186 "dropped" => Status::Dropped,
187 s => return Err(anyhow::anyhow!("{} is not a valid status", s)),
188 })
189 }
190}
191
192impl Status {
193 pub fn to_str(&self) -> &'static str {
195 match self {
196 Status::Starting => "starting",
197 Status::Running => "running",
198 Status::Paused => "paused",
199 Status::Stalled => "stalled",
200 Status::Ceased => "ceased",
201 Status::Dropped => "dropped",
202 }
203 }
204
205 pub fn superseded_by(self, new: Status) -> bool {
208 match (self, new) {
209 (_, Status::Dropped) => true,
210 (Status::Dropped, _) => false,
211 (Status::Paused, Status::Paused) => false,
213 _ => true,
216 }
217 }
218}
219
220#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
226pub struct StatusUpdate {
227 pub id: GlobalId,
228 pub status: Status,
229 pub timestamp: chrono::DateTime<chrono::Utc>,
230 pub error: Option<String>,
231 pub hints: BTreeSet<String>,
232 pub namespaced_errors: BTreeMap<String, String>,
233 pub replica_id: Option<ReplicaId>,
234}
235
236impl StatusUpdate {
237 pub fn new(
238 id: GlobalId,
239 timestamp: chrono::DateTime<chrono::Utc>,
240 status: Status,
241 ) -> StatusUpdate {
242 StatusUpdate {
243 id,
244 timestamp,
245 status,
246 error: None,
247 hints: Default::default(),
248 namespaced_errors: Default::default(),
249 replica_id: None,
250 }
251 }
252}
253
254impl From<StatusUpdate> for Row {
255 fn from(update: StatusUpdate) -> Self {
256 use mz_repr::Datum;
257
258 let timestamp = Datum::TimestampTz(update.timestamp.try_into().expect("must fit"));
259 let id = update.id.to_string();
260 let id = Datum::String(&id);
261 let status = Datum::String(update.status.to_str());
262 let error = update.error.as_deref().into();
263
264 let mut row = Row::default();
265 let mut packer = row.packer();
266 packer.extend([timestamp, id, status, error]);
267
268 if !update.hints.is_empty() || !update.namespaced_errors.is_empty() {
269 packer.push_dict_with(|dict_packer| {
270 if !update.hints.is_empty() {
273 dict_packer.push(Datum::String("hints"));
274 dict_packer.push_list(update.hints.iter().map(|s| Datum::String(s)));
275 }
276 if !update.namespaced_errors.is_empty() {
277 dict_packer.push(Datum::String("namespaced"));
278 dict_packer.push_dict(
279 update
280 .namespaced_errors
281 .iter()
282 .map(|(k, v)| (k.as_str(), Datum::String(v))),
283 );
284 }
285 });
286 } else {
287 packer.push(Datum::Null);
288 }
289
290 match update.replica_id {
291 Some(id) => packer.push(Datum::String(&id.to_string())),
292 None => packer.push(Datum::Null),
293 }
294
295 row
296 }
297}
298
299pub enum AppendOnlyUpdate {
301 Row((Row, Diff)),
302 Status(StatusUpdate),
303}
304
305impl AppendOnlyUpdate {
306 pub fn into_row(self) -> (Row, Diff) {
307 match self {
308 AppendOnlyUpdate::Row((row, diff)) => (row, diff),
309 AppendOnlyUpdate::Status(status) => (Row::from(status), Diff::ONE),
310 }
311 }
312}
313
314impl From<(Row, Diff)> for AppendOnlyUpdate {
315 fn from((row, diff): (Row, Diff)) -> Self {
316 Self::Row((row, diff))
317 }
318}
319
320impl From<StatusUpdate> for AppendOnlyUpdate {
321 fn from(update: StatusUpdate) -> Self {
322 Self::Status(update)
323 }
324}
325
326#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
328pub enum StorageResponse<T = mz_repr::Timestamp> {
329 FrontierUpper(GlobalId, Antichain<T>),
331 DroppedId(GlobalId),
333 StagedBatches(BTreeMap<uuid::Uuid, Vec<Result<ProtoBatch, String>>>),
335 StatisticsUpdates(Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>),
337 StatusUpdate(StatusUpdate),
340}
341
342#[derive(Debug)]
347pub struct PartitionedStorageState<T> {
348 parts: usize,
350 uppers: BTreeMap<GlobalId, (MutableAntichain<T>, Vec<Option<Antichain<T>>>)>,
353 oneshot_source_responses:
355 BTreeMap<uuid::Uuid, BTreeMap<usize, Vec<Result<ProtoBatch, String>>>>,
356}
357
358impl<T> Partitionable<StorageCommand<T>, StorageResponse<T>>
359 for (StorageCommand<T>, StorageResponse<T>)
360where
361 T: timely::progress::Timestamp + Lattice,
362{
363 type PartitionedState = PartitionedStorageState<T>;
364
365 fn new(parts: usize) -> PartitionedStorageState<T> {
366 PartitionedStorageState {
367 parts,
368 uppers: BTreeMap::new(),
369 oneshot_source_responses: BTreeMap::new(),
370 }
371 }
372}
373
374impl<T> PartitionedStorageState<T>
375where
376 T: timely::progress::Timestamp,
377{
378 fn observe_command(&mut self, command: &StorageCommand<T>) {
379 let _ = match command {
386 StorageCommand::Hello { .. } => {}
387 StorageCommand::RunIngestion(ingestion) => {
388 self.insert_new_uppers(ingestion.description.collection_ids());
389 }
390 StorageCommand::RunSink(export) => {
391 self.insert_new_uppers([export.id]);
392 }
393 StorageCommand::InitializationComplete
394 | StorageCommand::AllowWrites
395 | StorageCommand::UpdateConfiguration(_)
396 | StorageCommand::AllowCompaction(_, _)
397 | StorageCommand::RunOneshotIngestion(_)
398 | StorageCommand::CancelOneshotIngestion { .. } => {}
399 };
400 }
401
402 fn insert_new_uppers<I: IntoIterator<Item = GlobalId>>(&mut self, ids: I) {
408 for id in ids {
409 self.uppers.entry(id).or_insert_with(|| {
410 let mut frontier = MutableAntichain::new();
411 #[allow(clippy::as_conversions)]
414 frontier.update_iter(iter::once((T::minimum(), self.parts as i64)));
415 let part_frontiers = vec![Some(Antichain::from_elem(T::minimum())); self.parts];
416
417 (frontier, part_frontiers)
418 });
419 }
420 }
421}
422
423impl<T> PartitionedState<StorageCommand<T>, StorageResponse<T>> for PartitionedStorageState<T>
424where
425 T: timely::progress::Timestamp + Lattice,
426{
427 fn split_command(&mut self, command: StorageCommand<T>) -> Vec<Option<StorageCommand<T>>> {
428 self.observe_command(&command);
429
430 vec![Some(command); self.parts]
433 }
434
435 fn absorb_response(
436 &mut self,
437 shard_id: usize,
438 response: StorageResponse<T>,
439 ) -> Option<Result<StorageResponse<T>, anyhow::Error>> {
440 match response {
441 StorageResponse::FrontierUpper(id, new_shard_upper) => {
443 let (frontier, shard_frontiers) = match self.uppers.get_mut(&id) {
444 Some(value) => value,
445 None => panic!("Reference to absent collection: {id}"),
446 };
447 let old_upper = frontier.frontier().to_owned();
448 let shard_upper = match &mut shard_frontiers[shard_id] {
449 Some(shard_upper) => shard_upper,
450 None => panic!("Reference to absent shard {shard_id} for collection {id}"),
451 };
452 frontier.update_iter(shard_upper.iter().map(|t| (t.clone(), -1)));
453 frontier.update_iter(new_shard_upper.iter().map(|t| (t.clone(), 1)));
454 shard_upper.join_assign(&new_shard_upper);
455
456 let new_upper = frontier.frontier();
457 if PartialOrder::less_than(&old_upper.borrow(), &new_upper) {
458 Some(Ok(StorageResponse::FrontierUpper(id, new_upper.to_owned())))
459 } else {
460 None
461 }
462 }
463 StorageResponse::DroppedId(id) => {
464 let (_, shard_frontiers) = match self.uppers.get_mut(&id) {
465 Some(value) => value,
466 None => panic!("Reference to absent collection: {id}"),
467 };
468 let prev = shard_frontiers[shard_id].take();
469 assert!(
470 prev.is_some(),
471 "got double drop for {id} from shard {shard_id}"
472 );
473
474 if shard_frontiers.iter().all(Option::is_none) {
475 self.uppers.remove(&id);
476 Some(Ok(StorageResponse::DroppedId(id)))
477 } else {
478 None
479 }
480 }
481 StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
482 Some(Ok(StorageResponse::StatisticsUpdates(
486 source_stats,
487 sink_stats,
488 )))
489 }
490 StorageResponse::StatusUpdate(updates) => {
491 Some(Ok(StorageResponse::StatusUpdate(updates)))
492 }
493 StorageResponse::StagedBatches(batches) => {
494 let mut finished_batches = BTreeMap::new();
495
496 for (collection_id, batches) in batches {
497 tracing::info!(%shard_id, %collection_id, "got batch");
498
499 let entry = self
500 .oneshot_source_responses
501 .entry(collection_id)
502 .or_default();
503 let novel = entry.insert(shard_id, batches);
504 assert_none!(novel, "Duplicate oneshot source response");
505
506 if entry.len() == self.parts {
508 let entry = self
509 .oneshot_source_responses
510 .remove(&collection_id)
511 .expect("checked above");
512 let all_batches: Vec<_> = entry.into_values().flatten().collect();
513
514 finished_batches.insert(collection_id, all_batches);
515 }
516 }
517
518 if !finished_batches.is_empty() {
519 Some(Ok(StorageResponse::StagedBatches(finished_batches)))
520 } else {
521 None
522 }
523 }
524 }
525 }
526}
527
528#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
529pub struct Update<T = mz_repr::Timestamp> {
531 pub row: Row,
532 pub timestamp: T,
533 pub diff: Diff,
534}
535
536#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
537pub struct TimestamplessUpdate {
542 pub row: Row,
543 pub diff: Diff,
544}
545
546#[derive(Debug, Clone, PartialEq)]
547pub enum TableData {
548 Rows(Vec<(Row, Diff)>),
552 Batches(SmallVec<[ProtoBatch; 1]>),
554}
555
556impl TableData {
557 pub fn is_empty(&self) -> bool {
558 match self {
559 TableData::Rows(rows) => rows.is_empty(),
560 TableData::Batches(batches) => batches.is_empty(),
561 }
562 }
563}
564
565pub struct TimestamplessUpdateBuilder<K, V, T, D>
568where
569 K: Codec,
570 V: Codec,
571 T: Timestamp + Lattice + Codec64,
572 D: Codec64,
573{
574 builder: BatchBuilder<K, V, T, D>,
575 initial_ts: T,
576}
577
578impl<K, V, T, D> TimestamplessUpdateBuilder<K, V, T, D>
579where
580 K: Debug + Codec,
581 V: Debug + Codec,
582 T: TimestampManipulation + Lattice + Codec64 + Sync,
583 D: Semigroup + Ord + Codec64 + Send + Sync,
584{
585 pub fn new(handle: &WriteHandle<K, V, T, D>) -> Self {
588 let initial_ts = T::minimum();
589 let builder = handle.builder(Antichain::from_elem(initial_ts.clone()));
590 TimestamplessUpdateBuilder {
591 builder,
592 initial_ts,
593 }
594 }
595
596 pub async fn add(&mut self, k: &K, v: &V, d: &D) {
598 self.builder
599 .add(k, v, &self.initial_ts, d)
600 .await
601 .expect("invalid Persist usage");
602 }
603
604 pub async fn finish(self) -> ProtoBatch {
609 let finish_ts = StepForward::step_forward(&self.initial_ts);
610 let batch = self
611 .builder
612 .finish(Antichain::from_elem(finish_ts))
613 .await
614 .expect("invalid Persist usage");
615
616 batch.into_transmittable_batch()
617 }
618}
619
620impl TryIntoProtocolNonce for StorageCommand {
621 fn try_into_protocol_nonce(self) -> Result<Uuid, Self> {
622 match self {
623 StorageCommand::Hello { nonce } => Ok(nonce),
624 cmd => Err(cmd),
625 }
626 }
627}
628
629#[cfg(test)]
630mod tests {
631 use super::*;
632
633 #[mz_ore::test]
635 fn test_storage_command_size() {
636 assert_eq!(std::mem::size_of::<StorageCommand>(), 40);
637 }
638
639 #[mz_ore::test]
641 fn test_storage_response_size() {
642 assert_eq!(std::mem::size_of::<StorageResponse>(), 120);
643 }
644}