1#![allow(missing_docs)]
11
12use std::collections::{BTreeMap, BTreeSet};
15use std::fmt::Debug;
16use std::iter;
17
18use async_trait::async_trait;
19use differential_dataflow::difference::Semigroup;
20use differential_dataflow::lattice::Lattice;
21use mz_cluster_client::ReplicaId;
22use mz_cluster_client::client::TryIntoProtocolNonce;
23use mz_ore::assert_none;
24use mz_persist_client::batch::{BatchBuilder, ProtoBatch};
25use mz_persist_client::write::WriteHandle;
26use mz_persist_types::{Codec, Codec64, StepForward};
27use mz_repr::{Diff, GlobalId, Row, TimestampManipulation};
28use mz_service::client::{GenericClient, Partitionable, PartitionedState};
29use mz_storage_types::controller::CollectionMetadata;
30use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
31use mz_storage_types::parameters::StorageParameters;
32use mz_storage_types::sinks::StorageSinkDesc;
33use mz_storage_types::sources::IngestionDescription;
34use serde::{Deserialize, Serialize};
35use smallvec::SmallVec;
36use timely::PartialOrder;
37use timely::progress::Timestamp;
38use timely::progress::frontier::{Antichain, MutableAntichain};
39use uuid::Uuid;
40
41use crate::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate};
42
43pub trait StorageClient<T = mz_repr::Timestamp>:
45 GenericClient<StorageCommand<T>, StorageResponse<T>>
46{
47}
48
49impl<C, T> StorageClient<T> for C where C: GenericClient<StorageCommand<T>, StorageResponse<T>> {}
50
51#[async_trait]
52impl<T: Send> GenericClient<StorageCommand<T>, StorageResponse<T>> for Box<dyn StorageClient<T>> {
53 async fn send(&mut self, cmd: StorageCommand<T>) -> Result<(), anyhow::Error> {
54 (**self).send(cmd).await
55 }
56
57 async fn recv(&mut self) -> Result<Option<StorageResponse<T>>, anyhow::Error> {
63 (**self).recv().await
65 }
66}
67
68#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
70pub enum StorageCommand<T = mz_repr::Timestamp> {
71 Hello {
73 nonce: Uuid,
74 },
75 InitializationComplete,
78 AllowWrites,
85 UpdateConfiguration(Box<StorageParameters>),
87 RunIngestion(Box<RunIngestionCommand>),
89 AllowCompaction(GlobalId, Antichain<T>),
93 RunSink(Box<RunSinkCommand<T>>),
94 RunOneshotIngestion(Box<RunOneshotIngestion>),
100 CancelOneshotIngestion(Uuid),
109}
110
111impl<T> StorageCommand<T> {
112 pub fn installs_objects(&self) -> bool {
114 use StorageCommand::*;
115 match self {
116 Hello { .. }
117 | InitializationComplete
118 | AllowWrites
119 | UpdateConfiguration(_)
120 | AllowCompaction(_, _)
121 | CancelOneshotIngestion { .. } => false,
122 RunIngestion(_) | RunSink(_) | RunOneshotIngestion(_) => true,
126 }
127 }
128}
129
130#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
132pub struct RunIngestionCommand {
133 pub id: GlobalId,
135 pub description: IngestionDescription<CollectionMetadata>,
138}
139
140#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
142pub struct RunOneshotIngestion {
143 pub ingestion_id: uuid::Uuid,
145 pub collection_id: GlobalId,
147 pub collection_meta: CollectionMetadata,
149 pub request: OneshotIngestionRequest,
151}
152
153#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
155pub struct RunSinkCommand<T> {
156 pub id: GlobalId,
157 pub description: StorageSinkDesc<CollectionMetadata, T>,
158}
159
160#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
162pub enum Status {
163 Starting,
164 Running,
165 Paused,
166 Stalled,
167 Ceased,
170 Dropped,
171}
172
173impl std::str::FromStr for Status {
174 type Err = anyhow::Error;
175 fn from_str(s: &str) -> Result<Self, Self::Err> {
177 Ok(match s {
178 "starting" => Status::Starting,
179 "running" => Status::Running,
180 "paused" => Status::Paused,
181 "stalled" => Status::Stalled,
182 "ceased" => Status::Ceased,
183 "dropped" => Status::Dropped,
184 s => return Err(anyhow::anyhow!("{} is not a valid status", s)),
185 })
186 }
187}
188
189impl Status {
190 pub fn to_str(&self) -> &'static str {
192 match self {
193 Status::Starting => "starting",
194 Status::Running => "running",
195 Status::Paused => "paused",
196 Status::Stalled => "stalled",
197 Status::Ceased => "ceased",
198 Status::Dropped => "dropped",
199 }
200 }
201
202 pub fn superseded_by(self, new: Status) -> bool {
205 match (self, new) {
206 (_, Status::Dropped) => true,
207 (Status::Dropped, _) => false,
208 (Status::Paused, Status::Paused) => false,
210 _ => true,
213 }
214 }
215}
216
217#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
223pub struct StatusUpdate {
224 pub id: GlobalId,
225 pub status: Status,
226 pub timestamp: chrono::DateTime<chrono::Utc>,
227 pub error: Option<String>,
228 pub hints: BTreeSet<String>,
229 pub namespaced_errors: BTreeMap<String, String>,
230 pub replica_id: Option<ReplicaId>,
231}
232
233impl StatusUpdate {
234 pub fn new(
235 id: GlobalId,
236 timestamp: chrono::DateTime<chrono::Utc>,
237 status: Status,
238 ) -> StatusUpdate {
239 StatusUpdate {
240 id,
241 timestamp,
242 status,
243 error: None,
244 hints: Default::default(),
245 namespaced_errors: Default::default(),
246 replica_id: None,
247 }
248 }
249}
250
251impl From<StatusUpdate> for Row {
252 fn from(update: StatusUpdate) -> Self {
253 use mz_repr::Datum;
254
255 let timestamp = Datum::TimestampTz(update.timestamp.try_into().expect("must fit"));
256 let id = update.id.to_string();
257 let id = Datum::String(&id);
258 let status = Datum::String(update.status.to_str());
259 let error = update.error.as_deref().into();
260
261 let mut row = Row::default();
262 let mut packer = row.packer();
263 packer.extend([timestamp, id, status, error]);
264
265 if !update.hints.is_empty() || !update.namespaced_errors.is_empty() {
266 packer.push_dict_with(|dict_packer| {
267 if !update.hints.is_empty() {
270 dict_packer.push(Datum::String("hints"));
271 dict_packer.push_list(update.hints.iter().map(|s| Datum::String(s)));
272 }
273 if !update.namespaced_errors.is_empty() {
274 dict_packer.push(Datum::String("namespaced"));
275 dict_packer.push_dict(
276 update
277 .namespaced_errors
278 .iter()
279 .map(|(k, v)| (k.as_str(), Datum::String(v))),
280 );
281 }
282 });
283 } else {
284 packer.push(Datum::Null);
285 }
286
287 match update.replica_id {
288 Some(id) => packer.push(Datum::String(&id.to_string())),
289 None => packer.push(Datum::Null),
290 }
291
292 row
293 }
294}
295
296pub enum AppendOnlyUpdate {
298 Row((Row, Diff)),
299 Status(StatusUpdate),
300}
301
302impl AppendOnlyUpdate {
303 pub fn into_row(self) -> (Row, Diff) {
304 match self {
305 AppendOnlyUpdate::Row((row, diff)) => (row, diff),
306 AppendOnlyUpdate::Status(status) => (Row::from(status), Diff::ONE),
307 }
308 }
309}
310
311impl From<(Row, Diff)> for AppendOnlyUpdate {
312 fn from((row, diff): (Row, Diff)) -> Self {
313 Self::Row((row, diff))
314 }
315}
316
317impl From<StatusUpdate> for AppendOnlyUpdate {
318 fn from(update: StatusUpdate) -> Self {
319 Self::Status(update)
320 }
321}
322
323#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
325pub enum StorageResponse<T = mz_repr::Timestamp> {
326 FrontierUpper(GlobalId, Antichain<T>),
328 DroppedId(GlobalId),
330 StagedBatches(BTreeMap<uuid::Uuid, Vec<Result<ProtoBatch, String>>>),
332 StatisticsUpdates(Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>),
334 StatusUpdate(StatusUpdate),
337}
338
339#[derive(Debug)]
344pub struct PartitionedStorageState<T> {
345 parts: usize,
347 uppers: BTreeMap<GlobalId, (MutableAntichain<T>, Vec<Option<Antichain<T>>>)>,
350 oneshot_source_responses:
352 BTreeMap<uuid::Uuid, BTreeMap<usize, Vec<Result<ProtoBatch, String>>>>,
353}
354
355impl<T> Partitionable<StorageCommand<T>, StorageResponse<T>>
356 for (StorageCommand<T>, StorageResponse<T>)
357where
358 T: timely::progress::Timestamp + Lattice,
359{
360 type PartitionedState = PartitionedStorageState<T>;
361
362 fn new(parts: usize) -> PartitionedStorageState<T> {
363 PartitionedStorageState {
364 parts,
365 uppers: BTreeMap::new(),
366 oneshot_source_responses: BTreeMap::new(),
367 }
368 }
369}
370
371impl<T> PartitionedStorageState<T>
372where
373 T: timely::progress::Timestamp,
374{
375 fn observe_command(&mut self, command: &StorageCommand<T>) {
376 let _ = match command {
383 StorageCommand::Hello { .. } => {}
384 StorageCommand::RunIngestion(ingestion) => {
385 self.insert_new_uppers(ingestion.description.collection_ids());
386 }
387 StorageCommand::RunSink(export) => {
388 self.insert_new_uppers([export.id]);
389 }
390 StorageCommand::InitializationComplete
391 | StorageCommand::AllowWrites
392 | StorageCommand::UpdateConfiguration(_)
393 | StorageCommand::AllowCompaction(_, _)
394 | StorageCommand::RunOneshotIngestion(_)
395 | StorageCommand::CancelOneshotIngestion { .. } => {}
396 };
397 }
398
399 fn insert_new_uppers<I: IntoIterator<Item = GlobalId>>(&mut self, ids: I) {
405 for id in ids {
406 self.uppers.entry(id).or_insert_with(|| {
407 let mut frontier = MutableAntichain::new();
408 #[allow(clippy::as_conversions)]
411 frontier.update_iter(iter::once((T::minimum(), self.parts as i64)));
412 let part_frontiers = vec![Some(Antichain::from_elem(T::minimum())); self.parts];
413
414 (frontier, part_frontiers)
415 });
416 }
417 }
418}
419
420impl<T> PartitionedState<StorageCommand<T>, StorageResponse<T>> for PartitionedStorageState<T>
421where
422 T: timely::progress::Timestamp + Lattice,
423{
424 fn split_command(&mut self, command: StorageCommand<T>) -> Vec<Option<StorageCommand<T>>> {
425 self.observe_command(&command);
426
427 vec![Some(command); self.parts]
430 }
431
432 fn absorb_response(
433 &mut self,
434 shard_id: usize,
435 response: StorageResponse<T>,
436 ) -> Option<Result<StorageResponse<T>, anyhow::Error>> {
437 match response {
438 StorageResponse::FrontierUpper(id, new_shard_upper) => {
440 let (frontier, shard_frontiers) = match self.uppers.get_mut(&id) {
441 Some(value) => value,
442 None => panic!("Reference to absent collection: {id}"),
443 };
444 let old_upper = frontier.frontier().to_owned();
445 let shard_upper = match &mut shard_frontiers[shard_id] {
446 Some(shard_upper) => shard_upper,
447 None => panic!("Reference to absent shard {shard_id} for collection {id}"),
448 };
449 frontier.update_iter(shard_upper.iter().map(|t| (t.clone(), -1)));
450 frontier.update_iter(new_shard_upper.iter().map(|t| (t.clone(), 1)));
451 shard_upper.join_assign(&new_shard_upper);
452
453 let new_upper = frontier.frontier();
454 if PartialOrder::less_than(&old_upper.borrow(), &new_upper) {
455 Some(Ok(StorageResponse::FrontierUpper(id, new_upper.to_owned())))
456 } else {
457 None
458 }
459 }
460 StorageResponse::DroppedId(id) => {
461 let (_, shard_frontiers) = match self.uppers.get_mut(&id) {
462 Some(value) => value,
463 None => panic!("Reference to absent collection: {id}"),
464 };
465 let prev = shard_frontiers[shard_id].take();
466 assert!(
467 prev.is_some(),
468 "got double drop for {id} from shard {shard_id}"
469 );
470
471 if shard_frontiers.iter().all(Option::is_none) {
472 self.uppers.remove(&id);
473 Some(Ok(StorageResponse::DroppedId(id)))
474 } else {
475 None
476 }
477 }
478 StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
479 Some(Ok(StorageResponse::StatisticsUpdates(
483 source_stats,
484 sink_stats,
485 )))
486 }
487 StorageResponse::StatusUpdate(updates) => {
488 Some(Ok(StorageResponse::StatusUpdate(updates)))
489 }
490 StorageResponse::StagedBatches(batches) => {
491 let mut finished_batches = BTreeMap::new();
492
493 for (collection_id, batches) in batches {
494 tracing::info!(%shard_id, %collection_id, "got batch");
495
496 let entry = self
497 .oneshot_source_responses
498 .entry(collection_id)
499 .or_default();
500 let novel = entry.insert(shard_id, batches);
501 assert_none!(novel, "Duplicate oneshot source response");
502
503 if entry.len() == self.parts {
505 let entry = self
506 .oneshot_source_responses
507 .remove(&collection_id)
508 .expect("checked above");
509 let all_batches: Vec<_> = entry.into_values().flatten().collect();
510
511 finished_batches.insert(collection_id, all_batches);
512 }
513 }
514
515 if !finished_batches.is_empty() {
516 Some(Ok(StorageResponse::StagedBatches(finished_batches)))
517 } else {
518 None
519 }
520 }
521 }
522 }
523}
524
525#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
526pub struct Update<T = mz_repr::Timestamp> {
528 pub row: Row,
529 pub timestamp: T,
530 pub diff: Diff,
531}
532
533#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
534pub struct TimestamplessUpdate {
539 pub row: Row,
540 pub diff: Diff,
541}
542
543#[derive(Debug, Clone, PartialEq)]
544pub enum TableData {
545 Rows(Vec<(Row, Diff)>),
549 Batches(SmallVec<[ProtoBatch; 1]>),
551}
552
553impl TableData {
554 pub fn is_empty(&self) -> bool {
555 match self {
556 TableData::Rows(rows) => rows.is_empty(),
557 TableData::Batches(batches) => batches.is_empty(),
558 }
559 }
560}
561
562pub struct TimestamplessUpdateBuilder<K, V, T, D>
565where
566 K: Codec,
567 V: Codec,
568 T: Timestamp + Lattice + Codec64,
569 D: Codec64,
570{
571 builder: BatchBuilder<K, V, T, D>,
572 initial_ts: T,
573}
574
575impl<K, V, T, D> TimestamplessUpdateBuilder<K, V, T, D>
576where
577 K: Debug + Codec,
578 V: Debug + Codec,
579 T: TimestampManipulation + Lattice + Codec64 + Sync,
580 D: Semigroup + Ord + Codec64 + Send + Sync,
581{
582 pub fn new(handle: &WriteHandle<K, V, T, D>) -> Self {
585 let initial_ts = T::minimum();
586 let builder = handle.builder(Antichain::from_elem(initial_ts.clone()));
587 TimestamplessUpdateBuilder {
588 builder,
589 initial_ts,
590 }
591 }
592
593 pub async fn add(&mut self, k: &K, v: &V, d: &D) {
595 self.builder
596 .add(k, v, &self.initial_ts, d)
597 .await
598 .expect("invalid Persist usage");
599 }
600
601 pub async fn finish(self) -> ProtoBatch {
606 let finish_ts = StepForward::step_forward(&self.initial_ts);
607 let batch = self
608 .builder
609 .finish(Antichain::from_elem(finish_ts))
610 .await
611 .expect("invalid Persist usage");
612
613 batch.into_transmittable_batch()
614 }
615}
616
617impl TryIntoProtocolNonce for StorageCommand {
618 fn try_into_protocol_nonce(self) -> Result<Uuid, Self> {
619 match self {
620 StorageCommand::Hello { nonce } => Ok(nonce),
621 cmd => Err(cmd),
622 }
623 }
624}
625
626#[cfg(test)]
627mod tests {
628 use super::*;
629
630 #[mz_ore::test]
632 fn test_storage_command_size() {
633 assert_eq!(std::mem::size_of::<StorageCommand>(), 40);
634 }
635
636 #[mz_ore::test]
638 fn test_storage_response_size() {
639 assert_eq!(std::mem::size_of::<StorageResponse>(), 120);
640 }
641}