1use std::collections::BTreeMap;
13use std::sync::Arc;
14use std::time::Instant;
15
16use futures::stream::{FuturesUnordered, StreamExt};
17use mz_ore::cast::CastFrom;
18use mz_persist::location::Blob;
19use tokio::sync::Semaphore;
20use tracing::{error, info};
21
22use crate::cfg::{PersistConfig, USAGE_STATE_FETCH_CONCURRENCY_LIMIT};
23use crate::internal::paths::{BlobKey, BlobKeyPrefix, PartialBlobKey, WriterKey};
24use crate::internal::state::HollowBlobRef;
25use crate::internal::state_versions::StateVersions;
26use crate::{Metrics, PersistClient, ShardId, retry_external};
27
28#[derive(Clone, Debug)]
31pub struct ShardUsageReferenced {
32 pub(crate) batches_bytes: u64,
33 pub(crate) rollup_bytes: u64,
34}
35
36impl ShardUsageReferenced {
37 pub fn size_bytes(&self) -> u64 {
39 let Self {
40 batches_bytes,
41 rollup_bytes,
42 } = self;
43 *batches_bytes + *rollup_bytes
44 }
45}
46
47#[derive(Debug)]
49pub struct ShardsUsageReferenced {
50 pub by_shard: BTreeMap<ShardId, ShardUsageReferenced>,
52}
53
54#[derive(Clone, Debug)]
61pub struct ShardUsageAudit {
62 pub current_state_batches_bytes: u64,
65 pub current_state_rollups_bytes: u64,
67 pub referenced_not_current_state_bytes: u64,
74 pub not_leaked_not_referenced_bytes: u64,
83 pub leaked_bytes: u64,
90}
91
92impl ShardUsageAudit {
93 pub fn current_state_bytes(&self) -> u64 {
95 self.current_state_batches_bytes + self.current_state_rollups_bytes
96 }
97
98 pub fn referenced_bytes(&self) -> u64 {
100 self.current_state_bytes() + self.referenced_not_current_state_bytes
101 }
102
103 pub fn not_leaked_bytes(&self) -> u64 {
106 self.referenced_bytes() + self.not_leaked_not_referenced_bytes
107 }
108
109 pub fn total_bytes(&self) -> u64 {
114 self.not_leaked_bytes() + self.leaked_bytes
115 }
116}
117
118#[derive(Clone, Debug)]
120pub struct ShardsUsageAudit {
121 pub by_shard: BTreeMap<ShardId, ShardUsageAudit>,
123 pub unattributable_bytes: u64,
127}
128
129#[derive(Clone, Debug, Default)]
130struct BlobUsage {
131 by_shard: BTreeMap<ShardId, ShardBlobUsage>,
132 unattributable_bytes: u64,
133 batch_part_bytes: u64,
134 batch_part_count: u64,
135 rollup_size: u64,
136 rollup_count: u64,
137 total_size: u64,
138 total_count: u64,
139}
140
141#[derive(Clone, Debug, Default)]
142struct ShardBlobUsage {
143 by_writer: BTreeMap<WriterKey, u64>,
144 rollup_bytes: u64,
145}
146
147impl ShardBlobUsage {
148 fn total_bytes(&self) -> u64 {
149 self.by_writer.values().copied().sum::<u64>() + self.rollup_bytes
150 }
151}
152
153#[derive(Clone, Debug)]
155pub struct StorageUsageClient {
156 cfg: PersistConfig,
157 blob: Arc<dyn Blob>,
158 metrics: Arc<Metrics>,
159 state_versions: Arc<StateVersions>,
160}
161
162impl StorageUsageClient {
163 pub fn open(client: PersistClient) -> Self {
165 let state_versions = Arc::new(StateVersions::new(
166 client.cfg.clone(),
167 Arc::clone(&client.consensus),
168 Arc::clone(&client.blob),
169 Arc::clone(&client.metrics),
170 ));
171 StorageUsageClient {
172 cfg: client.cfg,
173 blob: client.blob,
174 metrics: client.metrics,
175 state_versions,
176 }
177 }
178
179 pub async fn shard_usage_referenced(&self, shard_id: ShardId) -> ShardUsageReferenced {
181 let mut start = Instant::now();
182 let states_iter = self
183 .state_versions
184 .fetch_all_live_states::<u64>(shard_id)
185 .await;
186 let states_iter = match states_iter {
187 Some(x) => x,
188 None => {
189 return ShardUsageReferenced {
190 batches_bytes: 0,
191 rollup_bytes: 0,
192 };
193 }
194 };
195 let mut states_iter = states_iter
196 .check_ts_codec()
197 .expect("ts should be a u64 in all prod shards");
198
199 let shard_metrics = &self.metrics.shards.shard(&shard_id, "unknown");
200 shard_metrics
201 .gc_live_diffs
202 .set(u64::cast_from(states_iter.len()));
203
204 let now = Instant::now();
205 self.metrics
206 .audit
207 .step_state
208 .inc_by(now.duration_since(start).as_secs_f64());
209 start = now;
210
211 let mut batches_bytes = 0;
212 let mut rollup_bytes = 0;
213 while let Some(_) = states_iter.next(|diff| {
214 diff.referenced_blobs().for_each(|blob| match blob {
215 HollowBlobRef::Batch(batch) => {
216 batches_bytes += batch.encoded_size_bytes();
217 }
218 HollowBlobRef::Rollup(rollup) => {
219 rollup_bytes += rollup.encoded_size_bytes.unwrap_or(1);
220 }
221 })
222 }) {}
223
224 let referenced = ShardUsageReferenced {
225 batches_bytes: u64::cast_from(batches_bytes),
226 rollup_bytes: u64::cast_from(rollup_bytes),
227 };
228
229 let current_state_sizes = states_iter.state().size_metrics();
230 shard_metrics
231 .usage_current_state_batches_bytes
232 .set(u64::cast_from(current_state_sizes.state_batches_bytes));
233 shard_metrics
234 .usage_current_state_rollups_bytes
235 .set(u64::cast_from(current_state_sizes.state_rollups_bytes));
236 shard_metrics.usage_referenced_not_current_state_bytes.set(
237 referenced.size_bytes()
238 - u64::cast_from(
239 current_state_sizes.state_batches_bytes
240 + current_state_sizes.state_rollups_bytes,
241 ),
242 );
243
244 self.metrics
245 .audit
246 .step_math
247 .inc_by(now.duration_since(start).as_secs_f64());
248
249 referenced
250 }
251
252 pub async fn shards_usage_referenced<I>(&self, shard_ids: I) -> ShardsUsageReferenced
254 where
255 I: IntoIterator<Item = ShardId>,
256 {
257 let semaphore = Arc::new(Semaphore::new(
258 USAGE_STATE_FETCH_CONCURRENCY_LIMIT.get(&self.cfg),
259 ));
260 let by_shard_futures = FuturesUnordered::new();
261 for shard_id in shard_ids {
262 let semaphore = Arc::clone(&semaphore);
263 let shard_usage_fut = async move {
264 let _permit = semaphore
265 .acquire()
266 .await
267 .expect("acquiring permit from open semaphore");
268 let shard_usage = self.shard_usage_referenced(shard_id).await;
269 (shard_id, shard_usage)
270 };
271 by_shard_futures.push(shard_usage_fut);
272 }
273 let by_shard = by_shard_futures.collect().await;
274 ShardsUsageReferenced { by_shard }
275 }
276
277 pub async fn shard_usage_audit(&self, shard_id: ShardId) -> ShardUsageAudit {
285 let mut blob_usage = self.blob_raw_usage(BlobKeyPrefix::Shard(&shard_id)).await;
286 let blob_usage = blob_usage.by_shard.remove(&shard_id).unwrap_or_default();
287 self.shard_usage_given_blob_usage(shard_id, &blob_usage)
288 .await
289 }
290
291 pub async fn shards_usage_audit(&self) -> ShardsUsageAudit {
295 let blob_usage = self.blob_raw_usage(BlobKeyPrefix::All).await;
296 self.metrics
297 .audit
298 .blob_batch_part_bytes
299 .set(blob_usage.batch_part_bytes);
300 self.metrics
301 .audit
302 .blob_batch_part_count
303 .set(blob_usage.batch_part_count);
304 self.metrics
305 .audit
306 .blob_rollup_bytes
307 .set(blob_usage.rollup_size);
308 self.metrics
309 .audit
310 .blob_rollup_count
311 .set(blob_usage.rollup_count);
312 self.metrics.audit.blob_bytes.set(blob_usage.total_size);
313 self.metrics.audit.blob_count.set(blob_usage.total_count);
314
315 let semaphore = Semaphore::new(USAGE_STATE_FETCH_CONCURRENCY_LIMIT.get(&self.cfg));
316 let by_shard_futures = FuturesUnordered::new();
317 for (shard_id, total_bytes) in blob_usage.by_shard.iter() {
318 let shard_usage_fut = async {
319 let _permit = semaphore
320 .acquire()
321 .await
322 .expect("acquiring permit from open semaphore");
323 let shard_usage = self
324 .shard_usage_given_blob_usage(*shard_id, total_bytes)
325 .await;
326 (*shard_id, shard_usage)
327 };
328 by_shard_futures.push(shard_usage_fut);
329 }
330
331 let by_shard = by_shard_futures.collect().await;
332 ShardsUsageAudit {
333 by_shard,
334 unattributable_bytes: blob_usage.unattributable_bytes,
335 }
336 }
337
338 async fn blob_raw_usage(&self, prefix: BlobKeyPrefix<'_>) -> BlobUsage {
339 retry_external(
340 &self.metrics.retries.external.storage_usage_shard_size,
341 || async {
342 let mut start = Instant::now();
343 let mut keys = 0;
344 let mut usage = BlobUsage::default();
345 self.blob
346 .list_keys_and_metadata(&prefix.to_string(), &mut |metadata| {
347 keys += 1;
350 if keys % 100 == 0 {
351 let now = Instant::now();
352 self.metrics
353 .audit
354 .step_blob_metadata
355 .inc_by(now.duration_since(start).as_secs_f64());
356 start = now;
357 }
358
359 match BlobKey::parse_ids(metadata.key) {
360 Ok((shard, partial_blob_key)) => {
361 let shard_usage = usage.by_shard.entry(shard).or_default();
362
363 match partial_blob_key {
364 PartialBlobKey::Batch(writer_id, _) => {
365 usage.batch_part_bytes += metadata.size_in_bytes;
366 usage.batch_part_count += 1;
367 *shard_usage.by_writer.entry(writer_id).or_default() +=
368 metadata.size_in_bytes;
369 }
370 PartialBlobKey::Rollup(_, _) => {
371 usage.rollup_size += metadata.size_in_bytes;
372 usage.rollup_count += 1;
373 shard_usage.rollup_bytes += metadata.size_in_bytes;
374 }
375 }
376 }
377 _ => {
378 info!("unknown blob: {}: {}", metadata.key, metadata.size_in_bytes);
379 usage.unattributable_bytes += metadata.size_in_bytes;
380 }
381 }
382 usage.total_size += metadata.size_in_bytes;
383 usage.total_count += 1;
384 })
385 .await?;
386 self.metrics
387 .audit
388 .step_blob_metadata
389 .inc_by(start.elapsed().as_secs_f64());
390 Ok(usage)
391 },
392 )
393 .await
394 }
395
396 async fn shard_usage_given_blob_usage(
397 &self,
398 shard_id: ShardId,
399 blob_usage: &ShardBlobUsage,
400 ) -> ShardUsageAudit {
401 let mut start = Instant::now();
402 let states_iter = self
403 .state_versions
404 .fetch_all_live_states::<u64>(shard_id)
405 .await;
406 let states_iter = match states_iter {
407 Some(x) => x,
408 None => {
409 error!(
416 concat!(
417 "shard {} existed in blob but not in consensus. This should be quite rare in ",
418 "prod, but is semi-expected in development if `bin/environmentd --reset` gets ",
419 "interrupted"
420 ),
421 shard_id
422 );
423 return ShardUsageAudit {
424 current_state_batches_bytes: 0,
425 current_state_rollups_bytes: 0,
426 referenced_not_current_state_bytes: 0,
427 not_leaked_not_referenced_bytes: 0,
428 leaked_bytes: blob_usage.total_bytes(),
429 };
430 }
431 };
432 let mut states_iter = states_iter
433 .check_ts_codec()
434 .expect("ts should be a u64 in all prod shards");
435 let now = Instant::now();
436 self.metrics
437 .audit
438 .step_state
439 .inc_by(now.duration_since(start).as_secs_f64());
440 start = now;
441
442 let shard_metrics = self.metrics.shards.shard(&shard_id, "unknown");
443 shard_metrics
444 .gc_live_diffs
445 .set(u64::cast_from(states_iter.len()));
446
447 let mut referenced_batches_bytes = BTreeMap::new();
448 let mut referenced_other_bytes = 0;
449 while let Some(_) = states_iter.next(|x| {
450 x.referenced_blobs().for_each(|x| match x {
451 HollowBlobRef::Batch(x) => {
452 for part in x.parts.iter() {
453 if let Some(writer_id) = part.writer_key() {
454 let writer_referenced_batches_bytes =
455 referenced_batches_bytes.entry(writer_id).or_default();
456 *writer_referenced_batches_bytes += u64::cast_from(part.hollow_bytes());
457 } else {
458 referenced_other_bytes += u64::cast_from(part.hollow_bytes());
459 }
460 }
461 }
462 HollowBlobRef::Rollup(x) => {
463 referenced_other_bytes +=
464 u64::cast_from(x.encoded_size_bytes.unwrap_or_default());
465 }
466 })
467 }) {}
468
469 let mut current_state_batches_bytes = 0;
470 let mut current_state_rollups_bytes = 0;
471 states_iter.state().blobs().for_each(|x| match x {
472 HollowBlobRef::Batch(x) => {
473 for part in x.parts.iter() {
474 current_state_batches_bytes += u64::cast_from(part.hollow_bytes());
475 }
476 }
477 HollowBlobRef::Rollup(x) => {
478 current_state_rollups_bytes +=
479 u64::cast_from(x.encoded_size_bytes.unwrap_or_default());
480 }
481 });
482 let current_state_bytes = current_state_batches_bytes + current_state_rollups_bytes;
483
484 let ret = ShardUsageAudit::from(ShardUsageCumulativeMaybeRacy {
485 current_state_batches_bytes,
486 current_state_bytes,
487 referenced_other_bytes,
488 referenced_batches_bytes: &referenced_batches_bytes,
489 minimum_key: WriterKey::for_version(&self.cfg.build_version),
492 blob_usage,
493 });
494
495 assert_eq!(ret.total_bytes(), blob_usage.total_bytes());
497
498 shard_metrics
499 .usage_current_state_batches_bytes
500 .set(ret.current_state_batches_bytes);
501 shard_metrics
502 .usage_current_state_rollups_bytes
503 .set(ret.current_state_rollups_bytes);
504 shard_metrics
505 .usage_referenced_not_current_state_bytes
506 .set(ret.referenced_not_current_state_bytes);
507 shard_metrics
508 .usage_not_leaked_not_referenced_bytes
509 .set(ret.not_leaked_not_referenced_bytes);
510 shard_metrics.usage_leaked_bytes.set(ret.leaked_bytes);
511
512 self.metrics
513 .audit
514 .step_math
515 .inc_by(start.elapsed().as_secs_f64());
516 ret
517 }
518
519 #[cfg(test)]
524 async fn size(
525 &self,
526 prefix: BlobKeyPrefix<'_>,
527 ) -> Result<u64, mz_persist::location::ExternalError> {
528 let mut total_size = 0;
529 self.blob
530 .list_keys_and_metadata(&prefix.to_string(), &mut |metadata| {
531 total_size += metadata.size_in_bytes;
532 })
533 .await?;
534 Ok(total_size)
535 }
536}
537
538#[derive(Debug)]
539struct ShardUsageCumulativeMaybeRacy<'a> {
540 current_state_batches_bytes: u64,
541 current_state_bytes: u64,
542 referenced_other_bytes: u64,
543 referenced_batches_bytes: &'a BTreeMap<WriterKey, u64>,
544 minimum_key: WriterKey,
545 blob_usage: &'a ShardBlobUsage,
546}
547
548impl From<ShardUsageCumulativeMaybeRacy<'_>> for ShardUsageAudit {
549 fn from(x: ShardUsageCumulativeMaybeRacy<'_>) -> Self {
550 let mut not_leaked_bytes = 0;
551 let mut total_bytes = 0;
552 for (writer_key, bytes) in x.blob_usage.by_writer.iter() {
553 total_bytes += *bytes;
554 let writer_key_is_live = *writer_key >= x.minimum_key;
555 if writer_key_is_live {
556 not_leaked_bytes += *bytes;
557 } else {
558 let writer_referenced =
563 x.referenced_batches_bytes.get(writer_key).map_or(0, |x| *x);
564 not_leaked_bytes += std::cmp::min(*bytes, writer_referenced);
570 }
571 }
572 total_bytes += x.blob_usage.rollup_bytes;
575 not_leaked_bytes += x.blob_usage.rollup_bytes;
576
577 let leaked_bytes = total_bytes
578 .checked_sub(not_leaked_bytes)
579 .expect("blob inputs should be cumulative");
580 let referenced_batches_bytes = x.referenced_batches_bytes.values().sum::<u64>();
581 let referenced_bytes = referenced_batches_bytes + x.referenced_other_bytes;
582 let mut referenced_not_current_state_bytes = referenced_bytes
583 .checked_sub(x.current_state_bytes)
584 .expect("state inputs should be cumulative");
585 let mut current_state_rollups_bytes = x
586 .current_state_bytes
587 .checked_sub(x.current_state_batches_bytes)
588 .expect("state inputs should be cumulative");
589 let mut current_state_batches_bytes = x.current_state_batches_bytes;
590
591 let mut not_leaked_not_referenced_bytes = not_leaked_bytes.saturating_sub(referenced_bytes);
628 let mut possible_over_count = referenced_bytes.saturating_sub(not_leaked_bytes);
629 fn adjust(adjustment: &mut u64, val: &mut u64) {
630 let x = std::cmp::min(*adjustment, *val);
631 *adjustment -= x;
632 *val -= x;
633 }
634 adjust(
635 &mut possible_over_count,
636 &mut not_leaked_not_referenced_bytes,
637 );
638 adjust(
639 &mut possible_over_count,
640 &mut referenced_not_current_state_bytes,
641 );
642 adjust(&mut possible_over_count, &mut current_state_rollups_bytes);
643 adjust(&mut possible_over_count, &mut current_state_batches_bytes);
644 assert_eq!(possible_over_count, 0);
645
646 let ret = ShardUsageAudit {
647 current_state_batches_bytes,
648 current_state_rollups_bytes,
649 referenced_not_current_state_bytes,
650 not_leaked_not_referenced_bytes,
651 leaked_bytes,
652 };
653
654 debug_assert_eq!(ret.total_bytes(), total_bytes);
656 debug_assert_eq!(ret.not_leaked_bytes(), not_leaked_bytes);
657 debug_assert!(ret.referenced_bytes() <= referenced_bytes);
659 debug_assert!(ret.current_state_bytes() <= x.current_state_bytes);
660 debug_assert!(ret.current_state_batches_bytes <= x.current_state_batches_bytes);
661 ret
662 }
663}
664
665impl std::fmt::Display for ShardUsageAudit {
666 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
667 write!(
668 f,
669 concat!(
670 "total s3 contents: {}\n",
671 " leaked: {}\n",
672 " not leaked: {}\n",
673 " not leaked not referenced: {}\n",
674 " referenced: {}\n",
675 " referenced not current state: {}\n",
676 " current state: {}\n",
677 " current rollups: {}\n",
678 " current batches: {}",
679 ),
680 HumanBytes(self.total_bytes()),
681 HumanBytes(self.leaked_bytes),
682 HumanBytes(self.not_leaked_bytes()),
683 HumanBytes(self.not_leaked_not_referenced_bytes),
684 HumanBytes(self.referenced_bytes()),
685 HumanBytes(self.referenced_not_current_state_bytes),
686 HumanBytes(self.current_state_bytes()),
687 HumanBytes(self.current_state_rollups_bytes),
688 HumanBytes(self.current_state_batches_bytes),
689 )
690 }
691}
692
693pub(crate) struct HumanBytes(pub u64);
694
695impl std::fmt::Display for HumanBytes {
696 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
697 if self.0 < 1_240u64 {
698 return write!(f, "{}B", self.0);
699 }
700 #[allow(clippy::as_conversions)]
701 let mut bytes = self.0 as f64 / 1_024f64;
702 if bytes < 1_240f64 {
703 return write!(f, "{:.1}KiB", bytes);
704 }
705 bytes = bytes / 1_024f64;
706 if bytes < 1_240f64 {
707 return write!(f, "{:.1}MiB", bytes);
708 }
709 bytes = bytes / 1_024f64;
710 if bytes < 1_240f64 {
711 return write!(f, "{:.1}GiB", bytes);
712 }
713 bytes = bytes / 1_024f64;
714 write!(f, "{:.1}TiB", bytes)
715 }
716}
717
718#[cfg(test)]
719mod tests {
720 use bytes::Bytes;
721 use mz_dyncfg::ConfigUpdates;
722 use mz_persist::location::SeqNo;
723 use semver::Version;
724 use timely::progress::Antichain;
725
726 use crate::ShardId;
727 use crate::batch::{
728 BLOB_TARGET_SIZE, BatchBuilderConfig, INLINE_WRITES_SINGLE_MAX_BYTES,
729 INLINE_WRITES_TOTAL_MAX_BYTES,
730 };
731 use crate::internal::paths::{PartialRollupKey, RollupId};
732 use crate::tests::new_test_client;
733
734 use super::*;
735
736 #[mz_persist_proc::test(tokio::test)]
737 #[cfg_attr(miri, ignore)] async fn size(dyncfgs: ConfigUpdates) {
739 let data = vec![
740 (("1".to_owned(), "one".to_owned()), 1, 1),
741 (("2".to_owned(), "two".to_owned()), 2, 1),
742 (("3".to_owned(), "three".to_owned()), 3, 1),
743 (("4".to_owned(), "four".to_owned()), 4, 1),
744 ];
745
746 let client = new_test_client(&dyncfgs).await;
747 let inline_writes_enabled = INLINE_WRITES_SINGLE_MAX_BYTES.get(&client.cfg) > 0;
748 let build_version = client.cfg.build_version.clone();
749 let shard_id_one = ShardId::new();
750 let shard_id_two = ShardId::new();
751
752 let (mut write, _) = client
754 .expect_open::<String, String, u64, i64>(shard_id_one)
755 .await;
756 write.expect_append(&data[..1], vec![0], vec![2]).await;
757
758 let (mut write, _) = client
760 .expect_open::<String, String, u64, i64>(shard_id_two)
761 .await;
762 write.expect_append(&data[1..3], vec![0], vec![4]).await;
763 let writer_one = WriterKey::Id(write.writer_id.clone());
764
765 let (mut write, _) = client
767 .expect_open::<String, String, u64, i64>(shard_id_two)
768 .await;
769 write.expect_append(&data[4..], vec![0], vec![5]).await;
770 let writer_two = WriterKey::Id(write.writer_id.clone());
771
772 let usage = StorageUsageClient::open(client);
773
774 let shard_one_size = usage
775 .size(BlobKeyPrefix::Shard(&shard_id_one))
776 .await
777 .expect("must have shard size");
778 let shard_two_size = usage
779 .size(BlobKeyPrefix::Shard(&shard_id_two))
780 .await
781 .expect("must have shard size");
782 let writer_one_size = usage
783 .size(BlobKeyPrefix::Writer(&shard_id_two, &writer_one))
784 .await
785 .expect("must have shard size");
786 let writer_two_size = usage
787 .size(BlobKeyPrefix::Writer(&shard_id_two, &writer_two))
788 .await
789 .expect("must have shard size");
790 let versioned_size = usage
791 .size(BlobKeyPrefix::Writer(
792 &shard_id_two,
793 &WriterKey::for_version(&build_version),
794 ))
795 .await
796 .expect("must have shard size");
797 let rollups_size = usage
798 .size(BlobKeyPrefix::Rollups(&shard_id_two))
799 .await
800 .expect("must have shard size");
801 let all_size = usage
802 .size(BlobKeyPrefix::All)
803 .await
804 .expect("must have shard size");
805
806 assert!(shard_one_size > 0);
807 assert!(shard_two_size > 0);
808 if inline_writes_enabled {
809 assert!(shard_one_size <= shard_two_size);
811 } else {
812 assert!(shard_one_size < shard_two_size);
813 }
814 assert_eq!(
815 shard_two_size,
816 writer_one_size + writer_two_size + versioned_size + rollups_size
817 );
818 assert_eq!(all_size, shard_one_size + shard_two_size);
819
820 assert_eq!(
821 usage.shard_usage_audit(shard_id_one).await.total_bytes(),
822 shard_one_size
823 );
824 assert_eq!(
825 usage.shard_usage_audit(shard_id_two).await.total_bytes(),
826 shard_two_size
827 );
828
829 let shards_usage = usage.shards_usage_audit().await;
830 assert_eq!(shards_usage.by_shard.len(), 2);
831 assert_eq!(
832 shards_usage
833 .by_shard
834 .get(&shard_id_one)
835 .map(|x| x.total_bytes()),
836 Some(shard_one_size)
837 );
838 assert_eq!(
839 shards_usage
840 .by_shard
841 .get(&shard_id_two)
842 .map(|x| x.total_bytes()),
843 Some(shard_two_size)
844 );
845 }
846
847 #[mz_persist_proc::test(tokio::test)]
850 #[cfg_attr(miri, ignore)] async fn usage_sanity(dyncfgs: ConfigUpdates) {
852 let data = vec![
853 (("1".to_owned(), "one".to_owned()), 1, 1),
854 (("2".to_owned(), "two".to_owned()), 2, 1),
855 (("3".to_owned(), "three".to_owned()), 3, 1),
856 (("4".to_owned(), "four".to_owned()), 4, 1),
857 ];
858
859 let shard_id = ShardId::new();
860 let mut client = new_test_client(&dyncfgs).await;
861 let inline_writes_enabled = INLINE_WRITES_SINGLE_MAX_BYTES.get(&client.cfg) > 0;
862
863 let (mut write0, _) = client
864 .expect_open::<String, String, u64, i64>(shard_id)
865 .await;
866 write0.expect_compare_and_append(&data[..2], 0, 3).await;
868 let batch = write0
870 .batch(&data[..2], Antichain::from_elem(0), Antichain::from_elem(3))
871 .await
872 .unwrap();
873 std::mem::forget(batch);
874
875 let (mut write1, _) = client
876 .expect_open::<String, String, u64, i64>(shard_id)
877 .await;
878
879 write1.expect_compare_and_append(&data[2..], 3, 5).await;
881 let batch = write1
883 .batch(&data[2..], Antichain::from_elem(3), Antichain::from_elem(5))
884 .await
885 .unwrap();
886 std::mem::forget(batch);
887 write1.expire().await;
888
889 let maintenance = write0.machine.add_rollup_for_current_seqno().await;
891 maintenance.perform(&write0.machine, &write0.gc).await;
892
893 client.cfg.build_version.minor += 1;
894 let usage = StorageUsageClient::open(client);
895 let shard_usage_audit = usage.shard_usage_audit(shard_id).await;
896 let shard_usage_referenced = usage.shard_usage_referenced(shard_id).await;
897 if !inline_writes_enabled {
898 assert!(shard_usage_audit.current_state_batches_bytes > 0);
900 assert!(shard_usage_referenced.batches_bytes > 0);
901 }
902 assert!(shard_usage_audit.current_state_rollups_bytes > 0);
904 assert!(shard_usage_referenced.rollup_bytes > 0);
905 assert!(shard_usage_audit.not_leaked_not_referenced_bytes > 0);
912 if !inline_writes_enabled {
913 assert!(shard_usage_audit.leaked_bytes > 0);
915 }
916 }
917
918 #[mz_persist_proc::test(tokio::test)]
919 #[cfg_attr(miri, ignore)] async fn usage_referenced(dyncfgs: ConfigUpdates) {
921 mz_ore::test::init_logging();
922
923 let data = vec![
924 (("1".to_owned(), "one".to_owned()), 1, 1),
925 (("2".to_owned(), "two".to_owned()), 2, 1),
926 (("3".to_owned(), "three".to_owned()), 3, 1),
927 (("4".to_owned(), "four".to_owned()), 4, 1),
928 ];
929
930 let shard_id = ShardId::new();
931 let mut client = new_test_client(&dyncfgs).await;
932 client.cfg.compaction_enabled = false;
934 client.cfg.set_config(&BLOB_TARGET_SIZE, 0);
936 let backpressure_would_flush = INLINE_WRITES_TOTAL_MAX_BYTES.get(&client.cfg) == 0;
941
942 let (mut write, _read) = client
943 .expect_open::<String, String, u64, i64>(shard_id)
944 .await;
945
946 let mut b1 = write.expect_batch(&data[..2], 0, 3).await;
947 let mut b2 = write.expect_batch(&data[2..], 2, 5).await;
948 if backpressure_would_flush {
949 let cfg = BatchBuilderConfig::new(&client.cfg, shard_id);
950 b1.flush_to_blob(
951 &cfg,
952 &client.metrics.user,
953 &client.isolated_runtime,
954 &write.write_schemas,
955 )
956 .await;
957 b2.flush_to_blob(
958 &cfg,
959 &client.metrics.user,
960 &client.isolated_runtime,
961 &write.write_schemas,
962 )
963 .await;
964 }
965
966 let batches_size =
967 u64::cast_from(b1.batch.encoded_size_bytes() + b2.batch.encoded_size_bytes());
968
969 write
970 .expect_compare_and_append_batch(&mut [&mut b1], 0, 3)
971 .await;
972 write
973 .expect_compare_and_append_batch(&mut [&mut b2], 3, 5)
974 .await;
975
976 let usage = StorageUsageClient::open(client);
977 let shard_usage_referenced = usage.shard_usage_referenced(shard_id).await;
978
979 assert_eq!(shard_usage_referenced.batches_bytes, batches_size);
981 }
982
983 struct TestCase {
984 current_state_batches_bytes: u64,
985 current_state_bytes: u64,
986 referenced_other_bytes: u64,
987 referenced_batches_bytes: Vec<(WriterKey, u64)>,
988 min_writer_key: WriterKey,
989 blob_usage_by_writer: Vec<(WriterKey, u64)>,
990 blob_usage_rollups: u64,
991 }
992
993 impl TestCase {
994 #[track_caller]
995 fn run(&self, expected: &str) {
996 let referenced_batches_bytes = self
997 .referenced_batches_bytes
998 .iter()
999 .map(|(id, b)| (id.clone(), *b))
1000 .collect();
1001 let blob_usage = ShardBlobUsage {
1002 by_writer: self
1003 .blob_usage_by_writer
1004 .iter()
1005 .map(|(id, b)| (id.clone(), *b))
1006 .collect(),
1007 rollup_bytes: self.blob_usage_rollups,
1008 };
1009 let input = ShardUsageCumulativeMaybeRacy {
1010 current_state_batches_bytes: self.current_state_batches_bytes,
1011 current_state_bytes: self.current_state_bytes,
1012 referenced_other_bytes: self.referenced_other_bytes,
1013 referenced_batches_bytes: &referenced_batches_bytes,
1014 minimum_key: self.min_writer_key.clone(),
1015 blob_usage: &blob_usage,
1016 };
1017 let usage = ShardUsageAudit::from(input);
1018 let actual = format!(
1019 "{} {}/{} {}/{} {}/{} {}/{}",
1020 usage.total_bytes(),
1021 usage.leaked_bytes,
1022 usage.not_leaked_bytes(),
1023 usage.not_leaked_not_referenced_bytes,
1024 usage.referenced_bytes(),
1025 usage.referenced_not_current_state_bytes,
1026 usage.current_state_bytes(),
1027 usage.current_state_rollups_bytes,
1028 usage.current_state_batches_bytes
1029 );
1030 assert_eq!(actual, expected);
1031 }
1032 }
1033
1034 fn version(minor: u64) -> WriterKey {
1035 WriterKey::for_version(&Version::new(0, minor, 0))
1036 }
1037
1038 #[mz_ore::test]
1039 fn usage_kitchen_sink() {
1040 TestCase {
1041 current_state_batches_bytes: 1,
1043 current_state_bytes: 2,
1045 referenced_other_bytes: 3,
1049 referenced_batches_bytes: vec![(version(3), 4), (version(2), 5)],
1052 min_writer_key: version(3),
1053 blob_usage_by_writer: vec![(version(3), 7), (version(2), 8)],
1056 blob_usage_rollups: 6,
1058 }
1059 .run("21 3/18 6/12 10/2 1/1");
1060 }
1061
1062 #[mz_ore::test]
1063 fn usage_funnel() {
1064 TestCase {
1066 current_state_batches_bytes: 1,
1067 current_state_bytes: 1,
1068 referenced_other_bytes: 0,
1069 referenced_batches_bytes: vec![(version(3), 1)],
1070 min_writer_key: version(3),
1071 blob_usage_by_writer: vec![(version(3), 1)],
1072 blob_usage_rollups: 0,
1073 }
1074 .run("1 0/1 0/1 0/1 0/1");
1075
1076 TestCase {
1078 current_state_batches_bytes: 0,
1079 current_state_bytes: 1,
1080 referenced_other_bytes: 0,
1081 referenced_batches_bytes: vec![(version(3), 1)],
1082 min_writer_key: version(3),
1083 blob_usage_by_writer: vec![(version(3), 1)],
1084 blob_usage_rollups: 0,
1085 }
1086 .run("1 0/1 0/1 0/1 1/0");
1087
1088 TestCase {
1090 current_state_batches_bytes: 0,
1091 current_state_bytes: 0,
1092 referenced_other_bytes: 0,
1093 referenced_batches_bytes: vec![(version(3), 1)],
1094 min_writer_key: version(3),
1095 blob_usage_by_writer: vec![(version(3), 1)],
1096 blob_usage_rollups: 0,
1097 }
1098 .run("1 0/1 0/1 1/0 0/0");
1099
1100 TestCase {
1102 current_state_batches_bytes: 0,
1103 current_state_bytes: 0,
1104 referenced_other_bytes: 0,
1105 referenced_batches_bytes: vec![],
1106 min_writer_key: version(3),
1107 blob_usage_by_writer: vec![(version(3), 1)],
1108 blob_usage_rollups: 0,
1109 }
1110 .run("1 0/1 1/0 0/0 0/0");
1111
1112 TestCase {
1114 current_state_batches_bytes: 0,
1115 current_state_bytes: 0,
1116 referenced_other_bytes: 0,
1117 referenced_batches_bytes: vec![],
1118 min_writer_key: version(3),
1119 blob_usage_by_writer: vec![(version(2), 1)],
1120 blob_usage_rollups: 0,
1121 }
1122 .run("1 1/0 0/0 0/0 0/0");
1123
1124 TestCase {
1126 current_state_batches_bytes: 0,
1127 current_state_bytes: 0,
1128 referenced_other_bytes: 0,
1129 referenced_batches_bytes: vec![],
1130 min_writer_key: version(3),
1131 blob_usage_by_writer: vec![],
1132 blob_usage_rollups: 0,
1133 }
1134 .run("0 0/0 0/0 0/0 0/0");
1135 }
1136
1137 #[mz_ore::test]
1138 fn usage_races() {
1139 TestCase {
1145 current_state_batches_bytes: 2,
1146 current_state_bytes: 4,
1147 referenced_other_bytes: 2,
1148 referenced_batches_bytes: vec![(version(3), 4)],
1149 min_writer_key: version(3),
1150 blob_usage_by_writer: vec![(version(3), 8), (version(2), 2)],
1151 blob_usage_rollups: 0,
1152 }
1153 .run("10 2/8 2/6 2/4 2/2");
1154
1155 TestCase {
1157 current_state_batches_bytes: 2,
1158 current_state_bytes: 4,
1159 referenced_other_bytes: 2,
1160 referenced_batches_bytes: vec![(version(3), 4)],
1161 min_writer_key: version(3),
1162 blob_usage_by_writer: vec![(version(3), 8), (version(2), 1)],
1163 blob_usage_rollups: 0,
1164 }
1165 .run("9 1/8 2/6 2/4 2/2");
1166
1167 TestCase {
1169 current_state_batches_bytes: 2,
1170 current_state_bytes: 4,
1171 referenced_other_bytes: 2,
1172 referenced_batches_bytes: vec![(version(3), 4)],
1173 min_writer_key: version(3),
1174 blob_usage_by_writer: vec![(version(3), 7)],
1175 blob_usage_rollups: 0,
1176 }
1177 .run("7 0/7 1/6 2/4 2/2");
1178
1179 TestCase {
1181 current_state_batches_bytes: 2,
1182 current_state_bytes: 4,
1183 referenced_other_bytes: 2,
1184 referenced_batches_bytes: vec![(version(3), 4)],
1185 min_writer_key: version(3),
1186 blob_usage_by_writer: vec![(version(3), 5)],
1187 blob_usage_rollups: 0,
1188 }
1189 .run("5 0/5 0/5 1/4 2/2");
1190
1191 TestCase {
1193 current_state_batches_bytes: 2,
1194 current_state_bytes: 4,
1195 referenced_other_bytes: 2,
1196 referenced_batches_bytes: vec![(version(3), 4)],
1197 min_writer_key: version(3),
1198 blob_usage_by_writer: vec![(version(3), 3)],
1199 blob_usage_rollups: 0,
1200 }
1201 .run("3 0/3 0/3 0/3 1/2");
1202
1203 TestCase {
1205 current_state_batches_bytes: 2,
1206 current_state_bytes: 4,
1207 referenced_other_bytes: 2,
1208 referenced_batches_bytes: vec![(version(3), 4)],
1209 min_writer_key: version(3),
1210 blob_usage_by_writer: vec![(version(3), 1)],
1211 blob_usage_rollups: 0,
1212 }
1213 .run("1 0/1 0/1 0/1 0/1");
1214 }
1215
1216 #[mz_ore::test]
1220 fn usage_regression_referenced_greater_than_blob() {
1221 TestCase {
1222 current_state_batches_bytes: 0,
1223 current_state_bytes: 0,
1224 referenced_other_bytes: 0,
1225 referenced_batches_bytes: vec![(version(3), 5)],
1226 min_writer_key: version(10),
1227 blob_usage_by_writer: vec![(version(3), 3)],
1228 blob_usage_rollups: 0,
1229 }
1230 .run("3 0/3 0/3 3/0 0/0");
1231 }
1232
1233 #[mz_persist_proc::test(tokio::test)]
1240 #[cfg_attr(miri, ignore)] async fn usage_regression_shard_in_blob_not_consensus(dyncfgs: ConfigUpdates) {
1242 let client = new_test_client(&dyncfgs).await;
1243 let shard_id = ShardId::new();
1244
1245 let key = PartialRollupKey::new(SeqNo(1), &RollupId::new());
1247 let key = key.complete(&shard_id);
1248 let () = client
1249 .blob
1250 .set(&key, Bytes::from(vec![0, 1, 2]))
1251 .await
1252 .unwrap();
1253 let usage = StorageUsageClient::open(client);
1254 let shards_usage = usage.shards_usage_audit().await;
1255 assert_eq!(shards_usage.by_shard.len(), 1);
1256 assert_eq!(
1257 shards_usage.by_shard.get(&shard_id).unwrap().leaked_bytes,
1258 3
1259 );
1260 }
1261}