1#![allow(missing_docs)]
11
12use std::collections::{BTreeMap, BTreeSet};
15use std::fmt::Debug;
16use std::iter;
17
18use async_trait::async_trait;
19use differential_dataflow::difference::Monoid;
20use differential_dataflow::lattice::Lattice;
21use mz_cluster_client::ReplicaId;
22use mz_cluster_client::client::TryIntoProtocolNonce;
23use mz_ore::assert_none;
24use mz_persist_client::batch::{BatchBuilder, ProtoBatch};
25use mz_persist_client::write::WriteHandle;
26use mz_persist_types::{Codec, Codec64, StepForward};
27use mz_repr::{Diff, GlobalId, Row, Timestamp};
28use mz_service::client::{GenericClient, Partitionable, PartitionedState};
29use mz_storage_types::controller::CollectionMetadata;
30use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
31use mz_storage_types::parameters::StorageParameters;
32use mz_storage_types::sinks::StorageSinkDesc;
33use mz_storage_types::sources::IngestionDescription;
34use serde::{Deserialize, Serialize};
35use smallvec::SmallVec;
36use timely::PartialOrder;
37use timely::progress::frontier::{Antichain, MutableAntichain};
38use uuid::Uuid;
39
40use crate::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate};
41
42pub trait StorageClient: GenericClient<StorageCommand, StorageResponse> {}
44
45impl<C> StorageClient for C where C: GenericClient<StorageCommand, StorageResponse> {}
46
47#[async_trait]
48impl GenericClient<StorageCommand, StorageResponse> for Box<dyn StorageClient> {
49 async fn send(&mut self, cmd: StorageCommand) -> Result<(), anyhow::Error> {
50 (**self).send(cmd).await
51 }
52
53 async fn recv(&mut self) -> Result<Option<StorageResponse>, anyhow::Error> {
59 (**self).recv().await
61 }
62}
63
64#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
66pub enum StorageCommand {
67 Hello {
69 nonce: Uuid,
70 },
71 InitializationComplete,
74 AllowWrites,
81 UpdateConfiguration(Box<StorageParameters>),
83 RunIngestion(Box<RunIngestionCommand>),
85 AllowCompaction(GlobalId, Antichain<Timestamp>),
89 RunSink(Box<RunSinkCommand>),
90 RunOneshotIngestion(Box<RunOneshotIngestion>),
96 CancelOneshotIngestion(Uuid),
105}
106
107impl StorageCommand {
108 pub fn installs_objects(&self) -> bool {
110 use StorageCommand::*;
111 match self {
112 Hello { .. }
113 | InitializationComplete
114 | AllowWrites
115 | UpdateConfiguration(_)
116 | AllowCompaction(_, _)
117 | CancelOneshotIngestion { .. } => false,
118 RunIngestion(_) | RunSink(_) | RunOneshotIngestion(_) => true,
122 }
123 }
124}
125
126#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
128pub struct RunIngestionCommand {
129 pub id: GlobalId,
131 pub description: IngestionDescription<CollectionMetadata>,
134}
135
136#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
138pub struct RunOneshotIngestion {
139 pub ingestion_id: uuid::Uuid,
141 pub collection_id: GlobalId,
143 pub collection_meta: CollectionMetadata,
145 pub request: OneshotIngestionRequest,
147}
148
149#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
151pub struct RunSinkCommand {
152 pub id: GlobalId,
153 pub description: StorageSinkDesc<CollectionMetadata>,
154}
155
156#[derive(
158 Copy,
159 Clone,
160 Debug,
161 Serialize,
162 Deserialize,
163 PartialEq,
164 Eq,
165 PartialOrd,
166 Ord
167)]
168pub enum Status {
169 Starting,
170 Running,
171 Paused,
172 Stalled,
173 Ceased,
176 Dropped,
177}
178
179impl std::str::FromStr for Status {
180 type Err = anyhow::Error;
181 fn from_str(s: &str) -> Result<Self, Self::Err> {
183 Ok(match s {
184 "starting" => Status::Starting,
185 "running" => Status::Running,
186 "paused" => Status::Paused,
187 "stalled" => Status::Stalled,
188 "ceased" => Status::Ceased,
189 "dropped" => Status::Dropped,
190 s => return Err(anyhow::anyhow!("{} is not a valid status", s)),
191 })
192 }
193}
194
195impl Status {
196 pub fn to_str(&self) -> &'static str {
198 match self {
199 Status::Starting => "starting",
200 Status::Running => "running",
201 Status::Paused => "paused",
202 Status::Stalled => "stalled",
203 Status::Ceased => "ceased",
204 Status::Dropped => "dropped",
205 }
206 }
207
208 pub fn superseded_by(self, new: Status) -> bool {
211 match (self, new) {
212 (_, Status::Dropped) => true,
213 (Status::Dropped, _) => false,
214 (Status::Paused, Status::Paused) => false,
216 _ => true,
219 }
220 }
221}
222
223#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
229pub struct StatusUpdate {
230 pub id: GlobalId,
231 pub status: Status,
232 pub timestamp: chrono::DateTime<chrono::Utc>,
233 pub error: Option<String>,
234 pub hints: BTreeSet<String>,
235 pub namespaced_errors: BTreeMap<String, String>,
236 pub replica_id: Option<ReplicaId>,
237}
238
239impl StatusUpdate {
240 pub fn new(
241 id: GlobalId,
242 timestamp: chrono::DateTime<chrono::Utc>,
243 status: Status,
244 ) -> StatusUpdate {
245 StatusUpdate {
246 id,
247 timestamp,
248 status,
249 error: None,
250 hints: Default::default(),
251 namespaced_errors: Default::default(),
252 replica_id: None,
253 }
254 }
255}
256
257impl From<StatusUpdate> for Row {
258 fn from(update: StatusUpdate) -> Self {
259 use mz_repr::Datum;
260
261 let timestamp = Datum::TimestampTz(update.timestamp.try_into().expect("must fit"));
262 let id = update.id.to_string();
263 let id = Datum::String(&id);
264 let status = Datum::String(update.status.to_str());
265 let error = update.error.as_deref().into();
266
267 let mut row = Row::default();
268 let mut packer = row.packer();
269 packer.extend([timestamp, id, status, error]);
270
271 if !update.hints.is_empty() || !update.namespaced_errors.is_empty() {
272 packer.push_dict_with(|dict_packer| {
273 if !update.hints.is_empty() {
276 dict_packer.push(Datum::String("hints"));
277 dict_packer.push_list(update.hints.iter().map(|s| Datum::String(s)));
278 }
279 if !update.namespaced_errors.is_empty() {
280 dict_packer.push(Datum::String("namespaced"));
281 dict_packer.push_dict(
282 update
283 .namespaced_errors
284 .iter()
285 .map(|(k, v)| (k.as_str(), Datum::String(v))),
286 );
287 }
288 });
289 } else {
290 packer.push(Datum::Null);
291 }
292
293 match update.replica_id {
294 Some(id) => packer.push(Datum::String(&id.to_string())),
295 None => packer.push(Datum::Null),
296 }
297
298 row
299 }
300}
301
302pub enum AppendOnlyUpdate {
304 Row((Row, Diff)),
305 Status(StatusUpdate),
306}
307
308impl AppendOnlyUpdate {
309 pub fn into_row(self) -> (Row, Diff) {
310 match self {
311 AppendOnlyUpdate::Row((row, diff)) => (row, diff),
312 AppendOnlyUpdate::Status(status) => (Row::from(status), Diff::ONE),
313 }
314 }
315}
316
317impl From<(Row, Diff)> for AppendOnlyUpdate {
318 fn from((row, diff): (Row, Diff)) -> Self {
319 Self::Row((row, diff))
320 }
321}
322
323impl From<StatusUpdate> for AppendOnlyUpdate {
324 fn from(update: StatusUpdate) -> Self {
325 Self::Status(update)
326 }
327}
328
329#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
331pub enum StorageResponse {
332 FrontierUpper(GlobalId, Antichain<Timestamp>),
334 DroppedId(GlobalId),
336 StagedBatches(BTreeMap<uuid::Uuid, Vec<Result<ProtoBatch, String>>>),
338 StatisticsUpdates(Vec<SourceStatisticsUpdate>, Vec<SinkStatisticsUpdate>),
340 StatusUpdate(StatusUpdate),
343}
344
345#[derive(Debug)]
350pub struct PartitionedStorageState {
351 parts: usize,
353 uppers: BTreeMap<
356 GlobalId,
357 (
358 MutableAntichain<Timestamp>,
359 Vec<Option<Antichain<Timestamp>>>,
360 ),
361 >,
362 oneshot_source_responses:
364 BTreeMap<uuid::Uuid, BTreeMap<usize, Vec<Result<ProtoBatch, String>>>>,
365}
366
367impl Partitionable<StorageCommand, StorageResponse> for (StorageCommand, StorageResponse) {
368 type PartitionedState = PartitionedStorageState;
369
370 fn new(parts: usize) -> PartitionedStorageState {
371 PartitionedStorageState {
372 parts,
373 uppers: BTreeMap::new(),
374 oneshot_source_responses: BTreeMap::new(),
375 }
376 }
377}
378
379impl PartitionedStorageState {
380 fn observe_command(&mut self, command: &StorageCommand) {
381 match command {
388 StorageCommand::Hello { .. } => {}
389 StorageCommand::RunIngestion(ingestion) => {
390 self.insert_new_uppers(ingestion.description.collection_ids());
391 }
392 StorageCommand::RunSink(export) => {
393 self.insert_new_uppers([export.id]);
394 }
395 StorageCommand::InitializationComplete
396 | StorageCommand::AllowWrites
397 | StorageCommand::UpdateConfiguration(_)
398 | StorageCommand::AllowCompaction(_, _)
399 | StorageCommand::RunOneshotIngestion(_)
400 | StorageCommand::CancelOneshotIngestion { .. } => {}
401 }
402 }
403
404 fn insert_new_uppers<I: IntoIterator<Item = GlobalId>>(&mut self, ids: I) {
410 for id in ids {
411 self.uppers.entry(id).or_insert_with(|| {
412 let mut frontier = MutableAntichain::new();
413 #[allow(clippy::as_conversions)]
416 frontier.update_iter(iter::once((Timestamp::MIN, self.parts as i64)));
417 let part_frontiers = vec![Some(Antichain::from_elem(Timestamp::MIN)); self.parts];
418
419 (frontier, part_frontiers)
420 });
421 }
422 }
423}
424
425impl PartitionedState<StorageCommand, StorageResponse> for PartitionedStorageState {
426 fn split_command(&mut self, command: StorageCommand) -> Vec<Option<StorageCommand>> {
427 self.observe_command(&command);
428
429 vec![Some(command); self.parts]
432 }
433
434 fn absorb_response(
435 &mut self,
436 shard_id: usize,
437 response: StorageResponse,
438 ) -> Option<Result<StorageResponse, anyhow::Error>> {
439 match response {
440 StorageResponse::FrontierUpper(id, new_shard_upper) => {
442 let (frontier, shard_frontiers) = match self.uppers.get_mut(&id) {
443 Some(value) => value,
444 None => panic!("Reference to absent collection: {id}"),
445 };
446 let old_upper = frontier.frontier().to_owned();
447 let shard_upper = match &mut shard_frontiers[shard_id] {
448 Some(shard_upper) => shard_upper,
449 None => panic!("Reference to absent shard {shard_id} for collection {id}"),
450 };
451 frontier.update_iter(shard_upper.iter().map(|t| (*t, -1)));
452 frontier.update_iter(new_shard_upper.iter().map(|t| (*t, 1)));
453 shard_upper.join_assign(&new_shard_upper);
454
455 let new_upper = frontier.frontier();
456 if PartialOrder::less_than(&old_upper.borrow(), &new_upper) {
457 Some(Ok(StorageResponse::FrontierUpper(id, new_upper.to_owned())))
458 } else {
459 None
460 }
461 }
462 StorageResponse::DroppedId(id) => {
463 let (_, shard_frontiers) = match self.uppers.get_mut(&id) {
464 Some(value) => value,
465 None => panic!("Reference to absent collection: {id}"),
466 };
467 let prev = shard_frontiers[shard_id].take();
468 assert!(
469 prev.is_some(),
470 "got double drop for {id} from shard {shard_id}"
471 );
472
473 if shard_frontiers.iter().all(Option::is_none) {
474 self.uppers.remove(&id);
475 Some(Ok(StorageResponse::DroppedId(id)))
476 } else {
477 None
478 }
479 }
480 StorageResponse::StatisticsUpdates(source_stats, sink_stats) => {
481 Some(Ok(StorageResponse::StatisticsUpdates(
485 source_stats,
486 sink_stats,
487 )))
488 }
489 StorageResponse::StatusUpdate(updates) => {
490 Some(Ok(StorageResponse::StatusUpdate(updates)))
491 }
492 StorageResponse::StagedBatches(batches) => {
493 let mut finished_batches = BTreeMap::new();
494
495 for (collection_id, batches) in batches {
496 tracing::info!(%shard_id, %collection_id, "got batch");
497
498 let entry = self
499 .oneshot_source_responses
500 .entry(collection_id)
501 .or_default();
502 let novel = entry.insert(shard_id, batches);
503 assert_none!(novel, "Duplicate oneshot source response");
504
505 if entry.len() == self.parts {
507 let entry = self
508 .oneshot_source_responses
509 .remove(&collection_id)
510 .expect("checked above");
511 let all_batches: Vec<_> = entry.into_values().flatten().collect();
512
513 finished_batches.insert(collection_id, all_batches);
514 }
515 }
516
517 if !finished_batches.is_empty() {
518 Some(Ok(StorageResponse::StagedBatches(finished_batches)))
519 } else {
520 None
521 }
522 }
523 }
524 }
525}
526
527#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
528pub struct Update {
530 pub row: Row,
531 pub timestamp: Timestamp,
532 pub diff: Diff,
533}
534
535#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
536pub struct TimestamplessUpdate {
541 pub row: Row,
542 pub diff: Diff,
543}
544
545#[derive(Debug, Clone, PartialEq)]
546pub enum TableData {
547 Rows(Vec<(Row, Diff)>),
551 Batches(SmallVec<[ProtoBatch; 1]>),
553}
554
555impl TableData {
556 pub fn is_empty(&self) -> bool {
557 match self {
558 TableData::Rows(rows) => rows.is_empty(),
559 TableData::Batches(batches) => batches.is_empty(),
560 }
561 }
562}
563
564pub struct TimestamplessUpdateBuilder<K, V, D>
567where
568 K: Codec,
569 V: Codec,
570 D: Codec64,
571{
572 builder: BatchBuilder<K, V, Timestamp, D>,
573 initial_ts: Timestamp,
574}
575
576impl<K, V, D> TimestamplessUpdateBuilder<K, V, D>
577where
578 K: Debug + Codec,
579 V: Debug + Codec,
580 D: Monoid + Ord + Codec64 + Send + Sync,
581{
582 pub fn new(handle: &WriteHandle<K, V, Timestamp, D>) -> Self {
585 let initial_ts = Timestamp::MIN;
586 let builder = handle.builder(Antichain::from_elem(initial_ts));
587 TimestamplessUpdateBuilder {
588 builder,
589 initial_ts,
590 }
591 }
592
593 pub async fn add(&mut self, k: &K, v: &V, d: &D) {
595 self.builder
596 .add(k, v, &self.initial_ts, d)
597 .await
598 .expect("invalid Persist usage");
599 }
600
601 pub async fn finish(self) -> ProtoBatch {
606 let finish_ts = StepForward::step_forward(&self.initial_ts);
607 let batch = self
608 .builder
609 .finish(Antichain::from_elem(finish_ts))
610 .await
611 .expect("invalid Persist usage");
612
613 batch.into_transmittable_batch()
614 }
615}
616
617impl TryIntoProtocolNonce for StorageCommand {
618 fn try_into_protocol_nonce(self) -> Result<Uuid, Self> {
619 match self {
620 StorageCommand::Hello { nonce } => Ok(nonce),
621 cmd => Err(cmd),
622 }
623 }
624}
625
626#[cfg(test)]
627mod tests {
628 use super::*;
629
630 #[mz_ore::test]
632 fn test_storage_command_size() {
633 assert_eq!(std::mem::size_of::<StorageCommand>(), 40);
634 }
635
636 #[mz_ore::test]
638 fn test_storage_response_size() {
639 assert_eq!(std::mem::size_of::<StorageResponse>(), 120);
640 }
641}