1use std::collections::btree_map::Entry;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt;
15use std::pin::pin;
16use std::str::FromStr;
17use std::sync::{Arc, Mutex};
18
19use anyhow::anyhow;
20use bytes::{BufMut, Bytes};
21use differential_dataflow::difference::{IsZero, Semigroup};
22use differential_dataflow::lattice::Lattice;
23use differential_dataflow::trace::Description;
24use futures_util::{StreamExt, TryStreamExt};
25use mz_ore::cast::CastFrom;
26use mz_ore::metrics::MetricsRegistry;
27use mz_ore::now::SYSTEM_TIME;
28use mz_ore::url::SensitiveUrl;
29use mz_persist::indexed::encoding::BlobTraceBatchPart;
30use mz_persist_types::codec_impls::TodoSchema;
31use mz_persist_types::{Codec, Codec64, Opaque};
32use mz_proto::RustType;
33use prost::Message;
34use serde_json::json;
35
36use crate::async_runtime::IsolatedRuntime;
37use crate::cache::StateCache;
38use crate::cli::args::{NO_COMMIT, READ_ALL_BUILD_INFO, StateArgs, make_blob, make_consensus};
39use crate::error::CodecConcreteType;
40use crate::fetch::EncodedPart;
41use crate::internal::encoding::{Rollup, UntypedState};
42use crate::internal::paths::{
43 BlobKey, BlobKeyPrefix, PartialBatchKey, PartialBlobKey, PartialRollupKey, WriterKey,
44};
45use crate::internal::state::{BatchPart, ProtoRollup, ProtoStateDiff, State};
46use crate::rpc::NoopPubSubSender;
47use crate::usage::{HumanBytes, StorageUsageClient};
48use crate::{Metrics, PersistClient, PersistConfig, ShardId};
49
50#[derive(Debug, clap::Args)]
52pub struct InspectArgs {
53 #[clap(subcommand)]
54 command: Command,
55}
56
57#[derive(Debug, clap::Subcommand)]
59pub(crate) enum Command {
60 State(StateArgs),
62
63 StateRollup(StateRollupArgs),
65
66 StateRollups(StateArgs),
68
69 BlobCount(BlobArgs),
71
72 BlobBatchPart(BlobBatchPartArgs),
74
75 ConsolidatedSize(StateArgs),
77
78 UnreferencedBlobs(StateArgs),
80
81 ShardStats(BlobArgs),
83
84 BlobUsage(StateArgs),
86
87 #[clap(verbatim_doc_comment)]
111 StateDiff(StateArgs),
112}
113
114pub async fn run(command: InspectArgs) -> Result<(), anyhow::Error> {
116 match command.command {
117 Command::State(args) => {
118 let state = fetch_latest_state(&args).await?;
119 println!(
120 "{}",
121 serde_json::to_string_pretty(&state).expect("unserializable state")
122 );
123 }
124 Command::StateRollup(args) => {
125 let state_rollup = fetch_state_rollup(&args).await?;
126 println!(
127 "{}",
128 serde_json::to_string_pretty(&state_rollup).expect("unserializable state")
129 );
130 }
131 Command::StateRollups(args) => {
132 let state_rollups = fetch_state_rollups(&args).await?;
133 println!(
134 "{}",
135 serde_json::to_string_pretty(&state_rollups).expect("unserializable state")
136 );
137 }
138 Command::StateDiff(args) => {
139 let states = fetch_state_diffs(&args).await?;
140 for window in states.windows(2) {
141 println!(
142 "{}",
143 json!({
144 "previous": window[0],
145 "new": window[1]
146 })
147 );
148 }
149 }
150 Command::BlobCount(args) => {
151 let blob_counts = blob_counts(&args.blob_uri).await?;
152 println!("{}", json!(blob_counts));
153 }
154 Command::BlobBatchPart(args) => {
155 let shard_id = ShardId::from_str(&args.shard_id).expect("invalid shard id");
156 let updates = blob_batch_part(&args.blob_uri, shard_id, args.key, args.limit).await?;
157 println!("{}", json!(updates));
158 }
159 Command::ConsolidatedSize(args) => {
160 let () = consolidated_size(&args).await?;
161 }
162 Command::UnreferencedBlobs(args) => {
163 let unreferenced_blobs = unreferenced_blobs(&args).await?;
164 println!("{}", json!(unreferenced_blobs));
165 }
166 Command::BlobUsage(args) => {
167 let () = blob_usage(&args).await?;
168 }
169 Command::ShardStats(args) => {
170 shard_stats(&args.blob_uri).await?;
171 }
172 }
173
174 Ok(())
175}
176
177#[derive(Debug, Clone, clap::Parser)]
179pub struct StateRollupArgs {
180 #[clap(flatten)]
181 pub(crate) state: StateArgs,
182
183 #[clap(long)]
185 pub(crate) rollup_key: Option<String>,
186}
187
188pub async fn fetch_latest_state(args: &StateArgs) -> Result<impl serde::Serialize, anyhow::Error> {
190 let shard_id = args.shard_id();
191 let state_versions = args.open().await?;
192 let versions = state_versions
193 .fetch_recent_live_diffs::<u64>(&shard_id)
194 .await;
195 let state = state_versions
196 .fetch_current_state::<u64>(&shard_id, versions.0.clone())
197 .await;
198 Ok(Rollup::from_untyped_state_without_diffs(state).into_proto())
199}
200
201pub async fn fetch_state_rollup(
204 args: &StateRollupArgs,
205) -> Result<impl serde::Serialize, anyhow::Error> {
206 let shard_id = args.state.shard_id();
207 let state_versions = args.state.open().await?;
208
209 let rollup_key = if let Some(rollup_key) = &args.rollup_key {
210 PartialRollupKey(rollup_key.to_owned())
211 } else {
212 let latest_state = state_versions.consensus.head(&shard_id.to_string()).await?;
213 let diff_buf = latest_state.ok_or_else(|| anyhow!("unknown shard"))?;
214 let diff = ProtoStateDiff::decode(diff_buf.data).expect("invalid encoded diff");
215 PartialRollupKey(diff.latest_rollup_key)
216 };
217 let rollup_buf = state_versions
218 .blob
219 .get(&rollup_key.complete(&shard_id))
220 .await?
221 .expect("fetching the specified state rollup");
222 let proto = ProtoRollup::decode(rollup_buf).expect("invalid encoded state");
223 Ok(proto)
224}
225
226pub async fn fetch_state_rollups(args: &StateArgs) -> Result<impl serde::Serialize, anyhow::Error> {
228 let shard_id = args.shard_id();
229 let state_versions = args.open().await?;
230
231 let mut rollup_keys = BTreeSet::new();
232 let mut state_iter = state_versions
233 .fetch_all_live_states::<u64>(shard_id)
234 .await
235 .expect("requested shard should exist")
236 .check_ts_codec()?;
237 while let Some(v) = state_iter.next(|_| {}) {
238 for rollup in v.collections.rollups.values() {
239 rollup_keys.insert(rollup.key.clone());
240 }
241 }
242
243 if rollup_keys.is_empty() {
244 return Err(anyhow!("unknown shard"));
245 }
246
247 let mut rollup_states = BTreeMap::new();
248 for key in rollup_keys {
249 let rollup_buf = state_versions
250 .blob
251 .get(&key.complete(&shard_id))
252 .await
253 .unwrap();
254 if let Some(rollup_buf) = rollup_buf {
255 let proto = ProtoRollup::decode(rollup_buf).expect("invalid encoded state");
256 rollup_states.insert(key.to_string(), proto);
257 }
258 }
259
260 Ok(rollup_states)
261}
262
263pub async fn fetch_state_diffs(
265 args: &StateArgs,
266) -> Result<Vec<impl serde::Serialize>, anyhow::Error> {
267 let shard_id = args.shard_id();
268 let state_versions = args.open().await?;
269
270 let mut live_states = vec![];
271 let mut state_iter = state_versions
272 .fetch_all_live_states::<u64>(shard_id)
273 .await
274 .expect("requested shard should exist")
275 .check_ts_codec()?;
276 while let Some(_) = state_iter.next(|_| {}) {
277 live_states.push(state_iter.into_rollup_proto_without_diffs());
278 }
279
280 Ok(live_states)
281}
282
283#[derive(Debug, Clone, clap::Parser)]
285pub struct BlobBatchPartArgs {
286 #[clap(long)]
288 shard_id: String,
289
290 #[clap(long)]
292 key: String,
293
294 #[clap(long)]
300 blob_uri: SensitiveUrl,
301
302 #[clap(long, default_value = "18446744073709551615")]
304 limit: usize,
305}
306
307#[derive(Debug, serde::Serialize)]
308struct BatchPartOutput {
309 desc: Description<u64>,
310 updates: Vec<BatchPartUpdate>,
311}
312
313#[derive(Debug, serde::Serialize)]
314struct BatchPartUpdate {
315 k: String,
316 v: String,
317 t: u64,
318 d: i64,
319}
320
321#[derive(PartialOrd, Ord, PartialEq, Eq)]
322struct PrettyBytes<'a>(&'a [u8]);
323
324impl fmt::Debug for PrettyBytes<'_> {
325 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
326 match std::str::from_utf8(self.0) {
327 Ok(x) => fmt::Debug::fmt(x, f),
328 Err(_) => fmt::Debug::fmt(self.0, f),
329 }
330 }
331}
332
333pub async fn blob_batch_part(
335 blob_uri: &SensitiveUrl,
336 shard_id: ShardId,
337 partial_key: String,
338 limit: usize,
339) -> Result<impl serde::Serialize, anyhow::Error> {
340 let cfg = PersistConfig::new_default_configs(&READ_ALL_BUILD_INFO, SYSTEM_TIME.clone());
341 let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
342 let blob = make_blob(&cfg, blob_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
343
344 let key = PartialBatchKey(partial_key);
345 let buf = blob
346 .get(&*key.complete(&shard_id))
347 .await
348 .expect("blob exists")
349 .expect("part exists");
350 let parsed = BlobTraceBatchPart::<u64>::decode(&buf, &metrics.columnar).expect("decodable");
351 let desc = parsed.desc.clone();
352
353 let encoded_part = EncodedPart::new(
354 metrics.read.snapshot.clone(),
355 parsed.desc.clone(),
356 &key.0,
357 None,
358 parsed,
359 );
360 let mut out = BatchPartOutput {
361 desc,
362 updates: Vec::new(),
363 };
364 let records = encoded_part.normalize(&metrics.columnar);
365 for ((k, v), t, d) in records
366 .records()
367 .expect("only implemented for records")
368 .iter()
369 {
370 if out.updates.len() > limit {
371 break;
372 }
373 out.updates.push(BatchPartUpdate {
374 k: format!("{:?}", PrettyBytes(k)),
375 v: format!("{:?}", PrettyBytes(v)),
376 t: u64::from_le_bytes(t),
377 d: i64::from_le_bytes(d),
378 });
379 }
380
381 Ok(out)
382}
383
384async fn consolidated_size(args: &StateArgs) -> Result<(), anyhow::Error> {
385 let shard_id = args.shard_id();
386 let state_versions = args.open().await?;
387 let versions = state_versions
388 .fetch_recent_live_diffs::<u64>(&shard_id)
389 .await;
390 let state = state_versions
391 .fetch_current_state::<u64>(&shard_id, versions.0.clone())
392 .await;
393 let state = state.check_ts_codec(&shard_id)?;
394 let shard_metrics = state_versions.metrics.shards.shard(&shard_id, "unknown");
395 let as_of = state.upper().borrow();
397
398 let mut updates = Vec::new();
399 for batch in state.collections.trace.batches() {
400 let mut part_stream =
401 pin!(batch.part_stream(shard_id, &*state_versions.blob, &*state_versions.metrics));
402 while let Some(part) = part_stream.try_next().await? {
403 tracing::info!("fetching {}", part.printable_name());
404 let encoded_part = EncodedPart::fetch(
405 &shard_id,
406 &*state_versions.blob,
407 &state_versions.metrics,
408 &shard_metrics,
409 &state_versions.metrics.read.snapshot,
410 &batch.desc,
411 &part,
412 )
413 .await
414 .expect("part exists");
415 let part = encoded_part.normalize(&state_versions.metrics.columnar);
416 for ((k, v), t, d) in part.records().expect("codec records").iter() {
417 let mut t = <u64 as Codec64>::decode(t);
418 t.advance_by(as_of);
419 let d = <i64 as Codec64>::decode(d);
420 updates.push(((k.to_owned(), v.to_owned()), t, d));
421 }
422 }
423 }
424
425 let bytes: usize = updates.iter().map(|((k, v), _, _)| k.len() + v.len()).sum();
426 println!("before: {} updates {} bytes", updates.len(), bytes);
427 differential_dataflow::consolidation::consolidate_updates(&mut updates);
428 let bytes: usize = updates.iter().map(|((k, v), _, _)| k.len() + v.len()).sum();
429 println!("after : {} updates {} bytes", updates.len(), bytes);
430
431 Ok(())
432}
433
434#[derive(Debug, Clone, clap::Parser)]
436pub struct BlobArgs {
437 #[clap(long)]
443 blob_uri: SensitiveUrl,
444}
445
446#[derive(Debug, Default, serde::Serialize)]
447struct BlobCounts {
448 batch_part_count: usize,
449 batch_part_bytes: usize,
450 rollup_count: usize,
451 rollup_bytes: usize,
452}
453
454pub async fn blob_counts(blob_uri: &SensitiveUrl) -> Result<impl serde::Serialize, anyhow::Error> {
456 let cfg = PersistConfig::new_default_configs(&READ_ALL_BUILD_INFO, SYSTEM_TIME.clone());
457 let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
458 let blob = make_blob(&cfg, blob_uri, NO_COMMIT, metrics).await?;
459
460 let mut blob_counts = BTreeMap::new();
461 let () = blob
462 .list_keys_and_metadata(&BlobKeyPrefix::All.to_string(), &mut |metadata| {
463 match BlobKey::parse_ids(metadata.key) {
464 Ok((shard, PartialBlobKey::Batch(_, _))) => {
465 let blob_count = blob_counts.entry(shard).or_insert_with(BlobCounts::default);
466 blob_count.batch_part_count += 1;
467 blob_count.batch_part_bytes += usize::cast_from(metadata.size_in_bytes);
468 }
469 Ok((shard, PartialBlobKey::Rollup(_, _))) => {
470 let blob_count = blob_counts.entry(shard).or_insert_with(BlobCounts::default);
471 blob_count.rollup_count += 1;
472 blob_count.rollup_bytes += usize::cast_from(metadata.size_in_bytes);
473 }
474 Err(err) => {
475 eprintln!("error parsing blob: {}", err);
476 }
477 }
478 })
479 .await?;
480
481 Ok(blob_counts)
482}
483
484pub async fn shard_stats(blob_uri: &SensitiveUrl) -> anyhow::Result<()> {
486 let cfg = PersistConfig::new_default_configs(&READ_ALL_BUILD_INFO, SYSTEM_TIME.clone());
487 let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
488 let blob = make_blob(&cfg, blob_uri, NO_COMMIT, metrics).await?;
489
490 let mut rollup_keys = BTreeMap::new();
492 blob.list_keys_and_metadata(&BlobKeyPrefix::All.to_string(), &mut |metadata| {
493 if let Ok((shard, PartialBlobKey::Rollup(seqno, rollup_id))) =
494 BlobKey::parse_ids(metadata.key)
495 {
496 let key = (seqno, rollup_id);
497 match rollup_keys.entry(shard) {
498 Entry::Vacant(v) => {
499 v.insert(key);
500 }
501 Entry::Occupied(o) => {
502 if key.0 > o.get().0 {
503 *o.into_mut() = key;
504 }
505 }
506 };
507 }
508 })
509 .await?;
510
511 println!(
512 "shard,bytes,parts,runs,batches,empty_batches,longest_run,byte_width,leased_readers,critical_readers,writers"
513 );
514 for (shard, (seqno, rollup)) in rollup_keys {
515 let rollup_key = PartialRollupKey::new(seqno, &rollup).complete(&shard);
516 let mut bytes = 0;
518 let mut parts = 0;
519 let mut runs = 0;
520 let mut batches = 0;
521 let mut empty_batches = 0;
522 let mut longest_run = 0;
523 let mut byte_width = 0;
527
528 let Some(rollup) = blob.get(&rollup_key).await? else {
529 continue;
531 };
532
533 let state: State<u64> =
534 UntypedState::decode(&cfg.build_version, rollup).check_ts_codec(&shard)?;
535
536 let leased_readers = state.collections.leased_readers.len();
537 let critical_readers = state.collections.critical_readers.len();
538 let writers = state.collections.writers.len();
539
540 state.collections.trace.map_batches(|b| {
541 bytes += b.encoded_size_bytes();
542 parts += b.part_count();
543 batches += 1;
544 if b.is_empty() {
545 empty_batches += 1;
546 }
547 for (_meta, run) in b.runs() {
548 let largest_part = run.iter().map(|p| p.max_part_bytes()).max().unwrap_or(0);
549 runs += 1;
550 longest_run = longest_run.max(run.len());
551 byte_width += largest_part;
552 }
553 });
554 println!(
555 "{shard},{bytes},{parts},{runs},{batches},{empty_batches},{longest_run},{byte_width},{leased_readers},{critical_readers},{writers}"
556 );
557 }
558
559 Ok(())
560}
561
562#[derive(Debug, Default, serde::Serialize)]
563struct UnreferencedBlobs {
564 batch_parts: BTreeSet<PartialBatchKey>,
565 rollups: BTreeSet<PartialRollupKey>,
566}
567
568pub async fn unreferenced_blobs(args: &StateArgs) -> Result<impl serde::Serialize, anyhow::Error> {
570 let shard_id = args.shard_id();
571 let state_versions = args.open().await?;
572
573 let mut all_parts = vec![];
574 let mut all_rollups = vec![];
575 let () = state_versions
576 .blob
577 .list_keys_and_metadata(
578 &BlobKeyPrefix::Shard(&shard_id).to_string(),
579 &mut |metadata| match BlobKey::parse_ids(metadata.key) {
580 Ok((_, PartialBlobKey::Batch(writer, part))) => {
581 all_parts.push((PartialBatchKey::new(&writer, &part), writer.clone()));
582 }
583 Ok((_, PartialBlobKey::Rollup(seqno, rollup))) => {
584 all_rollups.push(PartialRollupKey::new(seqno, &rollup));
585 }
586 Err(_) => {}
587 },
588 )
589 .await?;
590
591 let mut state_iter = state_versions
592 .fetch_all_live_states::<u64>(shard_id)
593 .await
594 .expect("requested shard should exist")
595 .check_ts_codec()?;
596
597 let mut known_parts = BTreeSet::new();
598 let mut known_rollups = BTreeSet::new();
599 let mut known_writers = BTreeSet::new();
600 while let Some(v) = state_iter.next(|_| {}) {
601 for writer_id in v.collections.writers.keys() {
602 known_writers.insert(writer_id.clone());
603 }
604 for batch in v.collections.trace.batches() {
605 let mut parts =
609 pin!(batch.part_stream(shard_id, &*state_versions.blob, &*state_versions.metrics));
610 while let Some(batch_part) = parts.next().await {
611 match &*batch_part? {
612 BatchPart::Hollow(x) => known_parts.insert(x.key.clone()),
613 BatchPart::Inline { .. } => continue,
614 };
615 }
616 }
617 for rollup in v.collections.rollups.values() {
618 known_rollups.insert(rollup.key.clone());
619 }
620 }
621
622 let mut unreferenced_blobs = UnreferencedBlobs::default();
623 let minimum_version = WriterKey::for_version(&state_versions.cfg.build_version);
626 for (part, writer) in all_parts {
627 let is_unreferenced = writer < minimum_version;
628 if is_unreferenced && !known_parts.contains(&part) {
629 unreferenced_blobs.batch_parts.insert(part);
630 }
631 }
632 for rollup in all_rollups {
633 if !known_rollups.contains(&rollup) {
634 unreferenced_blobs.rollups.insert(rollup);
635 }
636 }
637
638 Ok(unreferenced_blobs)
639}
640
641pub async fn blob_usage(args: &StateArgs) -> Result<(), anyhow::Error> {
643 let shard_id = if args.shard_id.is_empty() {
644 None
645 } else {
646 Some(args.shard_id())
647 };
648 let cfg = PersistConfig::new_default_configs(&READ_ALL_BUILD_INFO, SYSTEM_TIME.clone());
649 let metrics_registry = MetricsRegistry::new();
650 let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
651 let consensus =
652 make_consensus(&cfg, &args.consensus_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
653 let blob = make_blob(&cfg, &args.blob_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
654 let isolated_runtime = Arc::new(IsolatedRuntime::new(&metrics_registry, None));
655 let state_cache = Arc::new(StateCache::new(
656 &cfg,
657 Arc::clone(&metrics),
658 Arc::new(NoopPubSubSender),
659 ));
660 let usage = StorageUsageClient::open(PersistClient::new(
661 cfg,
662 blob,
663 consensus,
664 metrics,
665 isolated_runtime,
666 state_cache,
667 Arc::new(NoopPubSubSender),
668 )?);
669
670 if let Some(shard_id) = shard_id {
671 let usage = usage.shard_usage_audit(shard_id).await;
672 println!("{}\n{}", shard_id, usage);
673 } else {
674 let usage = usage.shards_usage_audit().await;
675 let mut by_shard = usage.by_shard.iter().collect::<Vec<_>>();
676 by_shard.sort_by_key(|(_, x)| x.total_bytes());
677 by_shard.reverse();
678 for (shard_id, usage) in by_shard {
679 println!("{}\n{}\n", shard_id, usage);
680 }
681 println!("unattributable: {}", HumanBytes(usage.unattributable_bytes));
682 }
683
684 Ok(())
685}
686
687#[derive(Default, Debug, PartialEq, Eq)]
696pub(crate) struct K;
697#[derive(Default, Debug, PartialEq, Eq)]
698pub(crate) struct V;
699#[derive(Default, Debug, PartialEq, Eq)]
700struct T;
701#[derive(Default, Debug, Clone, Eq, PartialEq, PartialOrd, Ord)]
702struct D(i64);
703
704pub(crate) static KVTD_CODECS: Mutex<(String, String, String, String, Option<CodecConcreteType>)> =
705 Mutex::new((
706 String::new(),
707 String::new(),
708 String::new(),
709 String::new(),
710 None,
711 ));
712
713impl Codec for K {
714 type Storage = ();
715 type Schema = TodoSchema<K>;
716
717 fn codec_name() -> String {
718 KVTD_CODECS.lock().expect("lockable").0.clone()
719 }
720
721 fn encode<B>(&self, _buf: &mut B)
722 where
723 B: BufMut,
724 {
725 }
726
727 fn decode(_buf: &[u8], _schema: &TodoSchema<K>) -> Result<Self, String> {
728 Ok(Self)
729 }
730
731 fn encode_schema(_schema: &Self::Schema) -> Bytes {
732 Bytes::new()
733 }
734
735 fn decode_schema(buf: &Bytes) -> Self::Schema {
736 assert_eq!(*buf, Bytes::new());
737 TodoSchema::default()
738 }
739}
740
741impl Codec for V {
742 type Storage = ();
743 type Schema = TodoSchema<V>;
744
745 fn codec_name() -> String {
746 KVTD_CODECS.lock().expect("lockable").1.clone()
747 }
748
749 fn encode<B>(&self, _buf: &mut B)
750 where
751 B: BufMut,
752 {
753 }
754
755 fn decode(_buf: &[u8], _schema: &TodoSchema<V>) -> Result<Self, String> {
756 Ok(Self)
757 }
758
759 fn encode_schema(_schema: &Self::Schema) -> Bytes {
760 Bytes::new()
761 }
762
763 fn decode_schema(buf: &Bytes) -> Self::Schema {
764 assert_eq!(*buf, Bytes::new());
765 TodoSchema::default()
766 }
767}
768
769impl Codec for T {
770 type Storage = ();
771 type Schema = TodoSchema<T>;
772
773 fn codec_name() -> String {
774 KVTD_CODECS.lock().expect("lockable").2.clone()
775 }
776
777 fn encode<B>(&self, _buf: &mut B)
778 where
779 B: BufMut,
780 {
781 }
782
783 fn decode(_buf: &[u8], _schema: &TodoSchema<T>) -> Result<Self, String> {
784 Ok(Self)
785 }
786
787 fn encode_schema(_schema: &Self::Schema) -> Bytes {
788 Bytes::new()
789 }
790
791 fn decode_schema(buf: &Bytes) -> Self::Schema {
792 assert_eq!(*buf, Bytes::new());
793 TodoSchema::default()
794 }
795}
796
797impl Codec64 for D {
798 fn codec_name() -> String {
799 KVTD_CODECS.lock().expect("lockable").3.clone()
800 }
801
802 fn encode(&self) -> [u8; 8] {
803 [0; 8]
804 }
805
806 fn decode(_buf: [u8; 8]) -> Self {
807 Self(0)
808 }
809}
810
811impl Semigroup for D {
812 fn plus_equals(&mut self, _rhs: &Self) {}
813}
814
815impl IsZero for D {
816 fn is_zero(&self) -> bool {
817 false
818 }
819}
820
821pub(crate) static FAKE_OPAQUE_CODEC: Mutex<String> = Mutex::new(String::new());
822
823#[derive(Debug, Clone, PartialEq)]
824pub(crate) struct O([u8; 8]);
825
826impl Codec64 for O {
827 fn codec_name() -> String {
828 FAKE_OPAQUE_CODEC.lock().expect("lockable").clone()
829 }
830
831 fn encode(&self) -> [u8; 8] {
832 self.0
833 }
834
835 fn decode(buf: [u8; 8]) -> Self {
836 Self(buf)
837 }
838}
839
840impl Opaque for O {
841 fn initial() -> Self {
842 Self([0; 8])
843 }
844}