1use std::collections::btree_map::Entry;
13use std::collections::{BTreeMap, BTreeSet};
14use std::pin::pin;
15use std::str::FromStr;
16use std::sync::{Arc, Mutex};
17
18use anyhow::anyhow;
19use bytes::{BufMut, Bytes};
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::trace::Description;
22use futures_util::{StreamExt, TryStreamExt};
23use mz_ore::cast::CastFrom;
24use mz_ore::metrics::MetricsRegistry;
25use mz_ore::now::SYSTEM_TIME;
26use mz_ore::url::SensitiveUrl;
27use mz_persist::indexed::encoding::BlobTraceBatchPart;
28use mz_persist_types::codec_impls::TodoSchema;
29use mz_persist_types::{Codec, Codec64, Opaque};
30use mz_proto::RustType;
31use prost::Message;
32use serde_json::json;
33
34use crate::async_runtime::IsolatedRuntime;
35use crate::cache::StateCache;
36use crate::cli::args::{NO_COMMIT, READ_ALL_BUILD_INFO, StateArgs, make_blob, make_consensus};
37use crate::error::CodecConcreteType;
38use crate::fetch::{EncodedPart, FetchConfig};
39use crate::internal::encoding::{Rollup, UntypedState};
40use crate::internal::paths::{
41 BlobKey, BlobKeyPrefix, PartialBatchKey, PartialBlobKey, PartialRollupKey, WriterKey,
42};
43use crate::internal::state::{BatchPart, ProtoRollup, ProtoStateDiff, State};
44use crate::rpc::NoopPubSubSender;
45use crate::usage::{HumanBytes, StorageUsageClient};
46use crate::{Metrics, PersistClient, PersistConfig, ShardId};
47
48#[derive(Debug, clap::Args)]
50pub struct InspectArgs {
51 #[clap(subcommand)]
52 command: Command,
53}
54
55#[derive(Debug, clap::Subcommand)]
57pub(crate) enum Command {
58 State(StateArgs),
60
61 StateRollup(StateRollupArgs),
63
64 StateRollups(StateArgs),
66
67 BlobCount(BlobArgs),
69
70 BlobBatchPart(BlobBatchPartArgs),
72
73 ConsolidatedSize(StateArgs),
75
76 UnreferencedBlobs(StateArgs),
78
79 ShardStats(BlobArgs),
81
82 BlobUsage(StateArgs),
84
85 #[clap(verbatim_doc_comment)]
109 StateDiff(StateArgs),
110}
111
112pub async fn run(command: InspectArgs) -> Result<(), anyhow::Error> {
114 match command.command {
115 Command::State(args) => {
116 let state = fetch_latest_state(&args).await?;
117 println!(
118 "{}",
119 serde_json::to_string_pretty(&state).expect("unserializable state")
120 );
121 }
122 Command::StateRollup(args) => {
123 let state_rollup = fetch_state_rollup(&args).await?;
124 println!(
125 "{}",
126 serde_json::to_string_pretty(&state_rollup).expect("unserializable state")
127 );
128 }
129 Command::StateRollups(args) => {
130 let state_rollups = fetch_state_rollups(&args).await?;
131 println!(
132 "{}",
133 serde_json::to_string_pretty(&state_rollups).expect("unserializable state")
134 );
135 }
136 Command::StateDiff(args) => {
137 let states = fetch_state_diffs(&args).await?;
138 for window in states.windows(2) {
139 println!(
140 "{}",
141 json!({
142 "previous": window[0],
143 "new": window[1]
144 })
145 );
146 }
147 }
148 Command::BlobCount(args) => {
149 let blob_counts = blob_counts(&args.blob_uri).await?;
150 println!("{}", json!(blob_counts));
151 }
152 Command::BlobBatchPart(args) => {
153 let shard_id = ShardId::from_str(&args.shard_id).expect("invalid shard id");
154 let updates = blob_batch_part(&args.blob_uri, shard_id, args.key, args.limit).await?;
155 println!("{}", json!(updates));
156 }
157 Command::ConsolidatedSize(args) => {
158 let () = consolidated_size(&args).await?;
159 }
160 Command::UnreferencedBlobs(args) => {
161 let unreferenced_blobs = unreferenced_blobs(&args).await?;
162 println!("{}", json!(unreferenced_blobs));
163 }
164 Command::BlobUsage(args) => {
165 let () = blob_usage(&args).await?;
166 }
167 Command::ShardStats(args) => {
168 shard_stats(&args.blob_uri).await?;
169 }
170 }
171
172 Ok(())
173}
174
175#[derive(Debug, Clone, clap::Parser)]
177pub struct StateRollupArgs {
178 #[clap(flatten)]
179 pub(crate) state: StateArgs,
180
181 #[clap(long)]
183 pub(crate) rollup_key: Option<String>,
184}
185
186pub async fn fetch_latest_state(args: &StateArgs) -> Result<impl serde::Serialize, anyhow::Error> {
188 let shard_id = args.shard_id();
189 let state_versions = args.open().await?;
190 let versions = state_versions
191 .fetch_recent_live_diffs::<u64>(&shard_id)
192 .await;
193 let state = state_versions
194 .fetch_current_state::<u64>(&shard_id, versions.0.clone())
195 .await;
196 Ok(Rollup::from_untyped_state_without_diffs(state).into_proto())
197}
198
199pub async fn fetch_state_rollup(
202 args: &StateRollupArgs,
203) -> Result<impl serde::Serialize, anyhow::Error> {
204 let shard_id = args.state.shard_id();
205 let state_versions = args.state.open().await?;
206
207 let rollup_key = if let Some(rollup_key) = &args.rollup_key {
208 PartialRollupKey(rollup_key.to_owned())
209 } else {
210 let latest_state = state_versions.consensus.head(&shard_id.to_string()).await?;
211 let diff_buf = latest_state.ok_or_else(|| anyhow!("unknown shard"))?;
212 let diff = ProtoStateDiff::decode(diff_buf.data).expect("invalid encoded diff");
213 PartialRollupKey(diff.latest_rollup_key)
214 };
215 let rollup_buf = state_versions
216 .blob
217 .get(&rollup_key.complete(&shard_id))
218 .await?
219 .expect("fetching the specified state rollup");
220 let proto = ProtoRollup::decode(rollup_buf).expect("invalid encoded state");
221 Ok(proto)
222}
223
224pub async fn fetch_state_rollups(args: &StateArgs) -> Result<impl serde::Serialize, anyhow::Error> {
226 let shard_id = args.shard_id();
227 let state_versions = args.open().await?;
228
229 let mut rollup_keys = BTreeSet::new();
230 let mut state_iter = state_versions
231 .fetch_all_live_states::<u64>(shard_id)
232 .await
233 .expect("requested shard should exist")
234 .check_ts_codec()?;
235 while let Some(v) = state_iter.next(|_| {}) {
236 for rollup in v.collections.rollups.values() {
237 rollup_keys.insert(rollup.key.clone());
238 }
239 }
240
241 if rollup_keys.is_empty() {
242 return Err(anyhow!("unknown shard"));
243 }
244
245 let mut rollup_states = BTreeMap::new();
246 for key in rollup_keys {
247 let rollup_buf = state_versions
248 .blob
249 .get(&key.complete(&shard_id))
250 .await
251 .unwrap();
252 if let Some(rollup_buf) = rollup_buf {
253 let proto = ProtoRollup::decode(rollup_buf).expect("invalid encoded state");
254 rollup_states.insert(key.to_string(), proto);
255 }
256 }
257
258 Ok(rollup_states)
259}
260
261pub async fn fetch_state_diffs(
263 args: &StateArgs,
264) -> Result<Vec<impl serde::Serialize>, anyhow::Error> {
265 let shard_id = args.shard_id();
266 let state_versions = args.open().await?;
267
268 let mut live_states = vec![];
269 let mut state_iter = state_versions
270 .fetch_all_live_states::<u64>(shard_id)
271 .await
272 .expect("requested shard should exist")
273 .check_ts_codec()?;
274 while let Some(_) = state_iter.next(|_| {}) {
275 live_states.push(state_iter.into_rollup_proto_without_diffs());
276 }
277
278 Ok(live_states)
279}
280
281#[derive(Debug, Clone, clap::Parser)]
283pub struct BlobBatchPartArgs {
284 #[clap(long)]
286 shard_id: String,
287
288 #[clap(long)]
290 key: String,
291
292 #[clap(long)]
298 blob_uri: SensitiveUrl,
299
300 #[clap(long, default_value = "18446744073709551615")]
302 limit: usize,
303}
304
305#[derive(Debug, serde::Serialize)]
306struct BatchPartOutput {
307 desc: Description<u64>,
308 updates: Vec<BatchPartUpdate>,
309}
310
311#[derive(Debug, serde::Serialize)]
312struct BatchPartUpdate {
313 k: String,
314 v: String,
315 t: u64,
316 d: i64,
317}
318
319pub async fn blob_batch_part(
321 blob_uri: &SensitiveUrl,
322 shard_id: ShardId,
323 partial_key: String,
324 limit: usize,
325) -> Result<impl serde::Serialize, anyhow::Error> {
326 let cfg = PersistConfig::new_default_configs(&READ_ALL_BUILD_INFO, SYSTEM_TIME.clone());
327 let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
328 let blob = make_blob(&cfg, blob_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
329
330 let key = PartialBatchKey(partial_key);
331 let buf = blob
332 .get(&*key.complete(&shard_id))
333 .await
334 .expect("blob exists")
335 .expect("part exists");
336 let parsed = BlobTraceBatchPart::<u64>::decode(&buf, &metrics.columnar).expect("decodable");
337 let desc = parsed.desc.clone();
338
339 let encoded_part = EncodedPart::new(
340 &FetchConfig::from_persist_config(&cfg),
341 metrics.read.snapshot.clone(),
342 parsed.desc.clone(),
343 &key.0,
344 None,
345 parsed,
346 );
347 let mut out = BatchPartOutput {
348 desc,
349 updates: Vec::new(),
350 };
351 let records = encoded_part
352 .updates()
353 .as_part()
354 .ok_or_else(|| anyhow!("expected structured data"))?
355 .as_ord();
356 for (k, v, t, d) in records.iter() {
357 if out.updates.len() > limit {
358 break;
359 }
360 out.updates.push(BatchPartUpdate {
361 k: k.to_string(),
362 v: v.to_string(),
363 t: u64::from_le_bytes(t),
364 d: i64::from_le_bytes(d),
365 });
366 }
367
368 Ok(out)
369}
370
371async fn consolidated_size(args: &StateArgs) -> Result<(), anyhow::Error> {
372 let shard_id = args.shard_id();
373 let state_versions = args.open().await?;
374 let cfg = &state_versions.cfg;
375 let versions = state_versions
376 .fetch_recent_live_diffs::<u64>(&shard_id)
377 .await;
378 let state = state_versions
379 .fetch_current_state::<u64>(&shard_id, versions.0.clone())
380 .await;
381 let state = state.check_ts_codec(&shard_id)?;
382 let shard_metrics = state_versions.metrics.shards.shard(&shard_id, "unknown");
383 let as_of = state.upper().borrow();
385
386 let mut parts = Vec::new();
387 for batch in state.collections.trace.batches() {
388 let mut part_stream =
389 pin!(batch.part_stream(shard_id, &*state_versions.blob, &*state_versions.metrics));
390 while let Some(part) = part_stream.try_next().await? {
391 tracing::info!("fetching {}", part.printable_name());
392 let encoded_part = EncodedPart::fetch(
393 &FetchConfig::from_persist_config(cfg),
394 &shard_id,
395 &*state_versions.blob,
396 &state_versions.metrics,
397 &shard_metrics,
398 &state_versions.metrics.read.snapshot,
399 &batch.desc,
400 &part,
401 )
402 .await
403 .expect("part exists");
404 let part = encoded_part.updates();
405 let part = part
406 .as_part()
407 .ok_or_else(|| anyhow!("expected structured data"))?
408 .as_ord();
409 parts.push(part);
410 }
411 }
412
413 let mut updates = vec![];
414 for part in &parts {
415 for (k, v, t, d) in part.iter() {
416 let mut t = <u64 as Codec64>::decode(t);
417 t.advance_by(as_of);
418 let d = <i64 as Codec64>::decode(d);
419 updates.push(((k, v), t, d));
420 }
421 }
422
423 let bytes: usize = updates
424 .iter()
425 .map(|((k, v), _, _)| k.goodbytes() + v.goodbytes())
426 .sum();
427 println!("before: {} updates {} bytes", updates.len(), bytes);
428 differential_dataflow::consolidation::consolidate_updates(&mut updates);
429 let bytes: usize = updates
430 .iter()
431 .map(|((k, v), _, _)| k.goodbytes() + v.goodbytes())
432 .sum();
433 println!("after : {} updates {} bytes", updates.len(), bytes);
434
435 Ok(())
436}
437
438#[derive(Debug, Clone, clap::Parser)]
440pub struct BlobArgs {
441 #[clap(long)]
447 blob_uri: SensitiveUrl,
448}
449
450#[derive(Debug, Default, serde::Serialize)]
451struct BlobCounts {
452 batch_part_count: usize,
453 batch_part_bytes: usize,
454 rollup_count: usize,
455 rollup_bytes: usize,
456}
457
458pub async fn blob_counts(blob_uri: &SensitiveUrl) -> Result<impl serde::Serialize, anyhow::Error> {
460 let cfg = PersistConfig::new_default_configs(&READ_ALL_BUILD_INFO, SYSTEM_TIME.clone());
461 let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
462 let blob = make_blob(&cfg, blob_uri, NO_COMMIT, metrics).await?;
463
464 let mut blob_counts = BTreeMap::new();
465 let () = blob
466 .list_keys_and_metadata(&BlobKeyPrefix::All.to_string(), &mut |metadata| {
467 match BlobKey::parse_ids(metadata.key) {
468 Ok((shard, PartialBlobKey::Batch(_, _))) => {
469 let blob_count = blob_counts.entry(shard).or_insert_with(BlobCounts::default);
470 blob_count.batch_part_count += 1;
471 blob_count.batch_part_bytes += usize::cast_from(metadata.size_in_bytes);
472 }
473 Ok((shard, PartialBlobKey::Rollup(_, _))) => {
474 let blob_count = blob_counts.entry(shard).or_insert_with(BlobCounts::default);
475 blob_count.rollup_count += 1;
476 blob_count.rollup_bytes += usize::cast_from(metadata.size_in_bytes);
477 }
478 Err(err) => {
479 eprintln!("error parsing blob: {}", err);
480 }
481 }
482 })
483 .await?;
484
485 Ok(blob_counts)
486}
487
488pub async fn shard_stats(blob_uri: &SensitiveUrl) -> anyhow::Result<()> {
490 let cfg = PersistConfig::new_default_configs(&READ_ALL_BUILD_INFO, SYSTEM_TIME.clone());
491 let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
492 let blob = make_blob(&cfg, blob_uri, NO_COMMIT, metrics).await?;
493
494 let mut rollup_keys = BTreeMap::new();
496 blob.list_keys_and_metadata(&BlobKeyPrefix::All.to_string(), &mut |metadata| {
497 if let Ok((shard, PartialBlobKey::Rollup(seqno, rollup_id))) =
498 BlobKey::parse_ids(metadata.key)
499 {
500 let key = (seqno, rollup_id);
501 match rollup_keys.entry(shard) {
502 Entry::Vacant(v) => {
503 v.insert(key);
504 }
505 Entry::Occupied(o) => {
506 if key.0 > o.get().0 {
507 *o.into_mut() = key;
508 }
509 }
510 };
511 }
512 })
513 .await?;
514
515 println!(
516 "shard,bytes,parts,runs,batches,empty_batches,longest_run,byte_width,leased_readers,critical_readers,writers"
517 );
518 for (shard, (seqno, rollup)) in rollup_keys {
519 let rollup_key = PartialRollupKey::new(seqno, &rollup).complete(&shard);
520 let mut bytes = 0;
522 let mut parts = 0;
523 let mut runs = 0;
524 let mut batches = 0;
525 let mut empty_batches = 0;
526 let mut longest_run = 0;
527 let mut byte_width = 0;
531
532 let Some(rollup) = blob.get(&rollup_key).await? else {
533 continue;
535 };
536
537 let state: State<u64> =
538 UntypedState::decode(&cfg.build_version, rollup).check_ts_codec(&shard)?;
539
540 let leased_readers = state.collections.leased_readers.len();
541 let critical_readers = state.collections.critical_readers.len();
542 let writers = state.collections.writers.len();
543
544 state.collections.trace.map_batches(|b| {
545 bytes += b.encoded_size_bytes();
546 parts += b.part_count();
547 batches += 1;
548 if b.is_empty() {
549 empty_batches += 1;
550 }
551 for (_meta, run) in b.runs() {
552 let largest_part = run.iter().map(|p| p.max_part_bytes()).max().unwrap_or(0);
553 runs += 1;
554 longest_run = longest_run.max(run.len());
555 byte_width += largest_part;
556 }
557 });
558 println!(
559 "{shard},{bytes},{parts},{runs},{batches},{empty_batches},{longest_run},{byte_width},{leased_readers},{critical_readers},{writers}"
560 );
561 }
562
563 Ok(())
564}
565
566#[derive(Debug, Default, serde::Serialize)]
567struct UnreferencedBlobs {
568 batch_parts: BTreeSet<PartialBatchKey>,
569 rollups: BTreeSet<PartialRollupKey>,
570}
571
572pub async fn unreferenced_blobs(args: &StateArgs) -> Result<impl serde::Serialize, anyhow::Error> {
574 let shard_id = args.shard_id();
575 let state_versions = args.open().await?;
576
577 let mut all_parts = vec![];
578 let mut all_rollups = vec![];
579 let () = state_versions
580 .blob
581 .list_keys_and_metadata(
582 &BlobKeyPrefix::Shard(&shard_id).to_string(),
583 &mut |metadata| match BlobKey::parse_ids(metadata.key) {
584 Ok((_, PartialBlobKey::Batch(writer, part))) => {
585 all_parts.push((PartialBatchKey::new(&writer, &part), writer.clone()));
586 }
587 Ok((_, PartialBlobKey::Rollup(seqno, rollup))) => {
588 all_rollups.push(PartialRollupKey::new(seqno, &rollup));
589 }
590 Err(_) => {}
591 },
592 )
593 .await?;
594
595 let mut state_iter = state_versions
596 .fetch_all_live_states::<u64>(shard_id)
597 .await
598 .expect("requested shard should exist")
599 .check_ts_codec()?;
600
601 let mut known_parts = BTreeSet::new();
602 let mut known_rollups = BTreeSet::new();
603 let mut known_writers = BTreeSet::new();
604 while let Some(v) = state_iter.next(|_| {}) {
605 for writer_id in v.collections.writers.keys() {
606 known_writers.insert(writer_id.clone());
607 }
608 for batch in v.collections.trace.batches() {
609 let mut parts =
613 pin!(batch.part_stream(shard_id, &*state_versions.blob, &*state_versions.metrics));
614 while let Some(batch_part) = parts.next().await {
615 match &*batch_part? {
616 BatchPart::Hollow(x) => known_parts.insert(x.key.clone()),
617 BatchPart::Inline { .. } => continue,
618 };
619 }
620 }
621 for rollup in v.collections.rollups.values() {
622 known_rollups.insert(rollup.key.clone());
623 }
624 }
625
626 let mut unreferenced_blobs = UnreferencedBlobs::default();
627 let minimum_version = WriterKey::for_version(&state_versions.cfg.build_version);
630 for (part, writer) in all_parts {
631 let is_unreferenced = writer < minimum_version;
632 if is_unreferenced && !known_parts.contains(&part) {
633 unreferenced_blobs.batch_parts.insert(part);
634 }
635 }
636 for rollup in all_rollups {
637 if !known_rollups.contains(&rollup) {
638 unreferenced_blobs.rollups.insert(rollup);
639 }
640 }
641
642 Ok(unreferenced_blobs)
643}
644
645pub async fn blob_usage(args: &StateArgs) -> Result<(), anyhow::Error> {
647 let shard_id = if args.shard_id.is_empty() {
648 None
649 } else {
650 Some(args.shard_id())
651 };
652 let cfg = PersistConfig::new_default_configs(&READ_ALL_BUILD_INFO, SYSTEM_TIME.clone());
653 let metrics_registry = MetricsRegistry::new();
654 let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
655 let consensus =
656 make_consensus(&cfg, &args.consensus_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
657 let blob = make_blob(&cfg, &args.blob_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
658 let isolated_runtime = Arc::new(IsolatedRuntime::new(&metrics_registry, None));
659 let state_cache = Arc::new(StateCache::new(
660 &cfg,
661 Arc::clone(&metrics),
662 Arc::new(NoopPubSubSender),
663 ));
664 let usage = StorageUsageClient::open(PersistClient::new(
665 cfg,
666 blob,
667 consensus,
668 metrics,
669 isolated_runtime,
670 state_cache,
671 Arc::new(NoopPubSubSender),
672 )?);
673
674 if let Some(shard_id) = shard_id {
675 let usage = usage.shard_usage_audit(shard_id).await;
676 println!("{}\n{}", shard_id, usage);
677 } else {
678 let usage = usage.shards_usage_audit().await;
679 let mut by_shard = usage.by_shard.iter().collect::<Vec<_>>();
680 by_shard.sort_by_key(|(_, x)| x.total_bytes());
681 by_shard.reverse();
682 for (shard_id, usage) in by_shard {
683 println!("{}\n{}\n", shard_id, usage);
684 }
685 println!("unattributable: {}", HumanBytes(usage.unattributable_bytes));
686 }
687
688 Ok(())
689}
690
691#[derive(Default, Debug, PartialEq, Eq)]
700pub(crate) struct K;
701#[derive(Default, Debug, PartialEq, Eq)]
702pub(crate) struct V;
703#[derive(Default, Debug, PartialEq, Eq)]
704struct T;
705
706pub(crate) static KVTD_CODECS: Mutex<(String, String, String, String, Option<CodecConcreteType>)> =
707 Mutex::new((
708 String::new(),
709 String::new(),
710 String::new(),
711 String::new(),
712 None,
713 ));
714
715impl Codec for K {
716 type Storage = ();
717 type Schema = TodoSchema<K>;
718
719 fn codec_name() -> String {
720 KVTD_CODECS.lock().expect("lockable").0.clone()
721 }
722
723 fn encode<B>(&self, _buf: &mut B)
724 where
725 B: BufMut,
726 {
727 }
728
729 fn decode(_buf: &[u8], _schema: &TodoSchema<K>) -> Result<Self, String> {
730 Ok(Self)
731 }
732
733 fn encode_schema(_schema: &Self::Schema) -> Bytes {
734 Bytes::new()
735 }
736
737 fn decode_schema(buf: &Bytes) -> Self::Schema {
738 assert_eq!(*buf, Bytes::new());
739 TodoSchema::default()
740 }
741}
742
743impl Codec for V {
744 type Storage = ();
745 type Schema = TodoSchema<V>;
746
747 fn codec_name() -> String {
748 KVTD_CODECS.lock().expect("lockable").1.clone()
749 }
750
751 fn encode<B>(&self, _buf: &mut B)
752 where
753 B: BufMut,
754 {
755 }
756
757 fn decode(_buf: &[u8], _schema: &TodoSchema<V>) -> Result<Self, String> {
758 Ok(Self)
759 }
760
761 fn encode_schema(_schema: &Self::Schema) -> Bytes {
762 Bytes::new()
763 }
764
765 fn decode_schema(buf: &Bytes) -> Self::Schema {
766 assert_eq!(*buf, Bytes::new());
767 TodoSchema::default()
768 }
769}
770
771impl Codec for T {
772 type Storage = ();
773 type Schema = TodoSchema<T>;
774
775 fn codec_name() -> String {
776 KVTD_CODECS.lock().expect("lockable").2.clone()
777 }
778
779 fn encode<B>(&self, _buf: &mut B)
780 where
781 B: BufMut,
782 {
783 }
784
785 fn decode(_buf: &[u8], _schema: &TodoSchema<T>) -> Result<Self, String> {
786 Ok(Self)
787 }
788
789 fn encode_schema(_schema: &Self::Schema) -> Bytes {
790 Bytes::new()
791 }
792
793 fn decode_schema(buf: &Bytes) -> Self::Schema {
794 assert_eq!(*buf, Bytes::new());
795 TodoSchema::default()
796 }
797}
798
799pub(crate) static FAKE_OPAQUE_CODEC: Mutex<String> = Mutex::new(String::new());
800
801#[derive(Debug, Clone, PartialEq)]
802pub(crate) struct O([u8; 8]);
803
804impl Codec64 for O {
805 fn codec_name() -> String {
806 FAKE_OPAQUE_CODEC.lock().expect("lockable").clone()
807 }
808
809 fn encode(&self) -> [u8; 8] {
810 self.0
811 }
812
813 fn decode(buf: [u8; 8]) -> Self {
814 Self(buf)
815 }
816}
817
818impl Opaque for O {
819 fn initial() -> Self {
820 Self([0; 8])
821 }
822}