1#![allow(missing_docs)]
11
12use std::collections::{BTreeMap, BTreeSet};
15use std::fmt::Debug;
16use std::iter;
17
18use async_trait::async_trait;
19use differential_dataflow::difference::Monoid;
20use differential_dataflow::lattice::Lattice;
21use mz_cluster_client::ReplicaId;
22use mz_cluster_client::client::TryIntoProtocolNonce;
23use mz_ore::assert_none;
24use mz_persist_client::batch::{BatchBuilder, ProtoBatch};
25use mz_persist_client::write::WriteHandle;
26use mz_persist_types::{Codec, Codec64, StepForward};
27use mz_repr::{Diff, GlobalId, Row, TimestampManipulation};
28use mz_service::client::{GenericClient, Partitionable, PartitionedState};
29use mz_storage_types::controller::CollectionMetadata;
30use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
31use mz_storage_types::parameters::StorageParameters;
32use mz_storage_types::sinks::StorageSinkDesc;
33use mz_storage_types::sources::IngestionDescription;
34use serde::{Deserialize, Serialize};
35use smallvec::SmallVec;
36use timely::PartialOrder;
37use timely::progress::Timestamp;
38use timely::progress::frontier::{Antichain, MutableAntichain};
39use uuid::Uuid;
40
41use crate::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate};
42
43pub trait StorageClient<T = mz_repr::Timestamp>:
45 GenericClient<StorageCommand<T>, StorageResponse<T>>
46{
47}
48
49impl<C, T> StorageClient<T> for C where C: GenericClient<StorageCommand<T>, StorageResponse<T>> {}
50
51#[async_trait]
52impl<T: Send> GenericClient<StorageCommand<T>, StorageResponse<T>> for Box<dyn StorageClient<T>> {
53 async fn send(&mut self, cmd: StorageCommand<T>) -> Result<(), anyhow::Error> {
54 (**self).send(cmd).await
55 }
56
57 async fn recv(&mut self) -> Result<Option<StorageResponse<T>>, anyhow::Error> {
63 (**self).recv().await
65 }
66}
67
68#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
70pub enum StorageCommand<T = mz_repr::Timestamp> {
71 Hello {
73 nonce: Uuid,
74 },
75 InitializationComplete,
78 AllowWrites,
85 UpdateConfiguration(Box<StorageParameters>),
87 RunIngestion(Box<RunIngestionCommand>),
89 AllowCompaction(GlobalId, Antichain<T>),
93 RunSink(Box<RunSinkCommand<T>>),
94 RunOneshotIngestion(Box<RunOneshotIngestion>),
100 CancelOneshotIngestion(Uuid),
109}
110
111impl<T> StorageCommand<T> {
112 pub fn installs_objects(&self) -> bool {
114 use StorageCommand::*;
115 match self {
116 Hello { .. }
117 | InitializationComplete
118 | AllowWrites
119 | UpdateConfiguration(_)
120 | AllowCompaction(_, _)
121 | CancelOneshotIngestion { .. } => false,
122 RunIngestion(_) | RunSink(_) | RunOneshotIngestion(_) => true,
126 }
127 }
128}
129
130#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
132pub struct RunIngestionCommand {
133 pub id: GlobalId,
135 pub description: IngestionDescription<CollectionMetadata>,
138}
139
140#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
142pub struct RunOneshotIngestion {
143 pub ingestion_id: uuid::Uuid,
145 pub collection_id: GlobalId,
147 pub collection_meta: CollectionMetadata,
149 pub request: OneshotIngestionRequest,
151}
152
153#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
155pub struct RunSinkCommand<T> {
156 pub id: GlobalId,
157 pub description: StorageSinkDesc<CollectionMetadata, T>,
158}
159
160#[derive(
162 Copy,
163 Clone,
164 Debug,
165 Serialize,
166 Deserialize,
167 PartialEq,
168 Eq,
169 PartialOrd,
170 Ord
171)]
172pub enum Status {
173 Starting,
174 Running,
175 Paused,
176 Stalled,
177 Ceased,
180 Dropped,
181}
182
183impl std::str::FromStr for Status {
184 type Err = anyhow::Error;
185 fn from_str(s: &str) -> Result<Self, Self::Err> {
187 Ok(match s {
188 "starting" => Status::Starting,
189 "running" => Status::Running,
190 "paused" => Status::Paused,
191 "stalled" => Status::Stalled,
192 "ceased" => Status::Ceased,
193 "dropped" => Status::Dropped,
194 s => return Err(anyhow::anyhow!("{} is not a valid status", s)),
195 })
196 }
197}
198
199impl Status {
200 pub fn to_str(&self) -> &'static str {
202 match self {
203 Status::Starting => "starting",
204 Status::Running => "running",
205 Status::Paused => "paused",
206 Status::Stalled => "stalled",
207 Status::Ceased => "ceased",
208 Status::Dropped => "dropped",
209 }
210 }
211
212 pub fn superseded_by(self, new: Status) -> bool {
215 match (self, new) {
216 (_, Status::Dropped) => true,
217 (Status::Dropped, _) => false,
218 (Status::Paused, Status::Paused) => false,
220 _ => true,
223 }
224 }
225}
226
227#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
233pub struct StatusUpdate {
234 pub id: GlobalId,
235 pub status: Status,
236 pub timestamp: chrono::DateTime<chrono::Utc>,
237 pub error: Option<String>,
238 pub hints: BTreeSet<String>,
239 pub namespaced_errors: BTreeMap<String, String>,
240 pub replica_id: Option<ReplicaId>,
241}
242
243impl StatusUpdate {
244 pub fn new(
245 id: GlobalId,
246 timestamp: chrono::DateTime<chrono::Utc>,
247 status: Status,
248 ) -> StatusUpdate {
249 StatusUpdate {
250 id,
251 timestamp,
252 status,
253 error: None,
254 hints: Default::default(),
255 namespaced_errors: Default::default(),
256 replica_id: None,
257 }
258 }
259}
260
261impl From<StatusUpdate> for Row {
262 fn from(update: StatusUpdate) -> Self {
263 use mz_repr::Datum;
264
265 let timestamp = Datum::TimestampTz(update.timestamp.try_into().expect("must fit"));
266 let id = update.id.to_string();
267 let id = Datum::String(&id);
268 let status = Datum::String(update.status.to_str());
269 let error = update.error.as_deref().into();
270
271 let mut row = Row::default();
272 let mut packer = row.packer();
273 packer.extend([timestamp, id, status, error]);
274
275 if !update.hints.is_empty() || !update.namespaced_errors.is_empty() {
276 packer.push_dict_with(|dict_packer| {
277 if !update.hints.is_empty() {
280 dict_packer.push(Datum::String("hints"));
281 dict_packer.push_list(update.hints.iter().map(|s| Datum::String(s)));
282 }
283 if !update.namespaced_errors.is_empty() {
284 dict_packer.push(Datum::String("namespaced"));
285 dict_packer.push_dict(
286 update
287 .namespaced_errors
288 .iter()
289 .map(|(k, v)| (k.as_str(), Datum::String(v))),
290 );
291 }
292 });
293 } else {
294 packer.push(Datum::Null);
295 }
296
297 match update.replica_id {
298 Some(id) => packer.push(Datum::String(&id.to_string())),
299 None => packer.push(Datum::Null),
300 }
301
302 row
303 }
304}
305
306pub enum AppendOnlyUpdate {
308 Row((Row, Diff)),
309 Status(StatusUpdate),
310}
311
312impl AppendOnlyUpdate {
313 pub fn into_row(self) -> (Row, Diff) {
314 match self {
315 AppendOnlyUpdate::Row((row, diff)) => (row, diff),
316 AppendOnlyUpdate::Status(status) => (Row::from(status), Diff::ONE),
317 }
318 }
319}
320
321impl From<(Row, Diff)> for AppendOnlyUpdate {
322 fn from((row, diff): (Row, Diff)) -> Self {
323 Self::Row((row, diff))
324 }
325}
326
327impl From<StatusUpdate> for AppendOnlyUpdate {
328 fn from(update: StatusUpdate) -> Self {
329 Self::Status(update)
330 }
331}
332
333#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
335pub enum StorageResponse<T = mz_repr::Timestamp> {
336 FrontierUpper(GlobalId, Antichain<T>),
338 DroppedId(GlobalId),
340 StagedBatches(BTreeMap<uuid::Uuid, Vec<Result<ProtoBatch, String>>>),
342 StatisticsUpdates(Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>),
344 StatusUpdate(StatusUpdate),
347}
348
349#[derive(Debug)]
354pub struct PartitionedStorageState<T> {
355 parts: usize,
357 uppers: BTreeMap<GlobalId, (MutableAntichain<T>, Vec<Option<Antichain<T>>>)>,
360 oneshot_source_responses:
362 BTreeMap<uuid::Uuid, BTreeMap<usize, Vec<Result<ProtoBatch, String>>>>,
363}
364
365impl<T> Partitionable<StorageCommand<T>, StorageResponse<T>>
366 for (StorageCommand<T>, StorageResponse<T>)
367where
368 T: timely::progress::Timestamp + Lattice,
369{
370 type PartitionedState = PartitionedStorageState<T>;
371
372 fn new(parts: usize) -> PartitionedStorageState<T> {
373 PartitionedStorageState {
374 parts,
375 uppers: BTreeMap::new(),
376 oneshot_source_responses: BTreeMap::new(),
377 }
378 }
379}
380
381impl<T> PartitionedStorageState<T>
382where
383 T: timely::progress::Timestamp,
384{
385 fn observe_command(&mut self, command: &StorageCommand<T>) {
386 let _ = match command {
393 StorageCommand::Hello { .. } => {}
394 StorageCommand::RunIngestion(ingestion) => {
395 self.insert_new_uppers(ingestion.description.collection_ids());
396 }
397 StorageCommand::RunSink(export) => {
398 self.insert_new_uppers([export.id]);
399 }
400 StorageCommand::InitializationComplete
401 | StorageCommand::AllowWrites
402 | StorageCommand::UpdateConfiguration(_)
403 | StorageCommand::AllowCompaction(_, _)
404 | StorageCommand::RunOneshotIngestion(_)
405 | StorageCommand::CancelOneshotIngestion { .. } => {}
406 };
407 }
408
409 fn insert_new_uppers<I: IntoIterator<Item = GlobalId>>(&mut self, ids: I) {
415 for id in ids {
416 self.uppers.entry(id).or_insert_with(|| {
417 let mut frontier = MutableAntichain::new();
418 #[allow(clippy::as_conversions)]
421 frontier.update_iter(iter::once((T::minimum(), self.parts as i64)));
422 let part_frontiers = vec![Some(Antichain::from_elem(T::minimum())); self.parts];
423
424 (frontier, part_frontiers)
425 });
426 }
427 }
428}
429
430impl<T> PartitionedState<StorageCommand<T>, StorageResponse<T>> for PartitionedStorageState<T>
431where
432 T: timely::progress::Timestamp + Lattice,
433{
434 fn split_command(&mut self, command: StorageCommand<T>) -> Vec<Option<StorageCommand<T>>> {
435 self.observe_command(&command);
436
437 vec![Some(command); self.parts]
440 }
441
442 fn absorb_response(
443 &mut self,
444 shard_id: usize,
445 response: StorageResponse<T>,
446 ) -> Option<Result<StorageResponse<T>, anyhow::Error>> {
447 match response {
448 StorageResponse::FrontierUpper(id, new_shard_upper) => {
450 let (frontier, shard_frontiers) = match self.uppers.get_mut(&id) {
451 Some(value) => value,
452 None => panic!("Reference to absent collection: {id}"),
453 };
454 let old_upper = frontier.frontier().to_owned();
455 let shard_upper = match &mut shard_frontiers[shard_id] {
456 Some(shard_upper) => shard_upper,
457 None => panic!("Reference to absent shard {shard_id} for collection {id}"),
458 };
459 frontier.update_iter(shard_upper.iter().map(|t| (t.clone(), -1)));
460 frontier.update_iter(new_shard_upper.iter().map(|t| (t.clone(), 1)));
461 shard_upper.join_assign(&new_shard_upper);
462
463 let new_upper = frontier.frontier();
464 if PartialOrder::less_than(&old_upper.borrow(), &new_upper) {
465 Some(Ok(StorageResponse::FrontierUpper(id, new_upper.to_owned())))
466 } else {
467 None
468 }
469 }
470 StorageResponse::DroppedId(id) => {
471 let (_, shard_frontiers) = match self.uppers.get_mut(&id) {
472 Some(value) => value,
473 None => panic!("Reference to absent collection: {id}"),
474 };
475 let prev = shard_frontiers[shard_id].take();
476 assert!(
477 prev.is_some(),
478 "got double drop for {id} from shard {shard_id}"
479 );
480
481 if shard_frontiers.iter().all(Option::is_none) {
482 self.uppers.remove(&id);
483 Some(Ok(StorageResponse::DroppedId(id)))
484 } else {
485 None
486 }
487 }
488 StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
489 Some(Ok(StorageResponse::StatisticsUpdates(
493 source_stats,
494 sink_stats,
495 )))
496 }
497 StorageResponse::StatusUpdate(updates) => {
498 Some(Ok(StorageResponse::StatusUpdate(updates)))
499 }
500 StorageResponse::StagedBatches(batches) => {
501 let mut finished_batches = BTreeMap::new();
502
503 for (collection_id, batches) in batches {
504 tracing::info!(%shard_id, %collection_id, "got batch");
505
506 let entry = self
507 .oneshot_source_responses
508 .entry(collection_id)
509 .or_default();
510 let novel = entry.insert(shard_id, batches);
511 assert_none!(novel, "Duplicate oneshot source response");
512
513 if entry.len() == self.parts {
515 let entry = self
516 .oneshot_source_responses
517 .remove(&collection_id)
518 .expect("checked above");
519 let all_batches: Vec<_> = entry.into_values().flatten().collect();
520
521 finished_batches.insert(collection_id, all_batches);
522 }
523 }
524
525 if !finished_batches.is_empty() {
526 Some(Ok(StorageResponse::StagedBatches(finished_batches)))
527 } else {
528 None
529 }
530 }
531 }
532 }
533}
534
535#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
536pub struct Update<T = mz_repr::Timestamp> {
538 pub row: Row,
539 pub timestamp: T,
540 pub diff: Diff,
541}
542
543#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
544pub struct TimestamplessUpdate {
549 pub row: Row,
550 pub diff: Diff,
551}
552
553#[derive(Debug, Clone, PartialEq)]
554pub enum TableData {
555 Rows(Vec<(Row, Diff)>),
559 Batches(SmallVec<[ProtoBatch; 1]>),
561}
562
563impl TableData {
564 pub fn is_empty(&self) -> bool {
565 match self {
566 TableData::Rows(rows) => rows.is_empty(),
567 TableData::Batches(batches) => batches.is_empty(),
568 }
569 }
570}
571
572pub struct TimestamplessUpdateBuilder<K, V, T, D>
575where
576 K: Codec,
577 V: Codec,
578 T: Timestamp + Lattice + Codec64,
579 D: Codec64,
580{
581 builder: BatchBuilder<K, V, T, D>,
582 initial_ts: T,
583}
584
585impl<K, V, T, D> TimestamplessUpdateBuilder<K, V, T, D>
586where
587 K: Debug + Codec,
588 V: Debug + Codec,
589 T: TimestampManipulation + Lattice + Codec64 + Sync,
590 D: Monoid + Ord + Codec64 + Send + Sync,
591{
592 pub fn new(handle: &WriteHandle<K, V, T, D>) -> Self {
595 let initial_ts = T::minimum();
596 let builder = handle.builder(Antichain::from_elem(initial_ts.clone()));
597 TimestamplessUpdateBuilder {
598 builder,
599 initial_ts,
600 }
601 }
602
603 pub async fn add(&mut self, k: &K, v: &V, d: &D) {
605 self.builder
606 .add(k, v, &self.initial_ts, d)
607 .await
608 .expect("invalid Persist usage");
609 }
610
611 pub async fn finish(self) -> ProtoBatch {
616 let finish_ts = StepForward::step_forward(&self.initial_ts);
617 let batch = self
618 .builder
619 .finish(Antichain::from_elem(finish_ts))
620 .await
621 .expect("invalid Persist usage");
622
623 batch.into_transmittable_batch()
624 }
625}
626
627impl TryIntoProtocolNonce for StorageCommand {
628 fn try_into_protocol_nonce(self) -> Result<Uuid, Self> {
629 match self {
630 StorageCommand::Hello { nonce } => Ok(nonce),
631 cmd => Err(cmd),
632 }
633 }
634}
635
636#[cfg(test)]
637mod tests {
638 use super::*;
639
640 #[mz_ore::test]
642 fn test_storage_command_size() {
643 assert_eq!(std::mem::size_of::<StorageCommand>(), 40);
644 }
645
646 #[mz_ore::test]
648 fn test_storage_response_size() {
649 assert_eq!(std::mem::size_of::<StorageResponse>(), 120);
650 }
651}