1use crate::CollectionMetadata;
13use std::collections::{BTreeMap, BTreeSet};
14use std::sync::atomic::AtomicBool;
15use std::sync::{Arc, atomic};
16use std::time::{Duration, Instant};
17
18use anyhow::bail;
19use async_trait::async_trait;
20use differential_dataflow::lattice::Lattice;
21use itertools::Itertools;
22use mz_build_info::BuildInfo;
23use mz_cluster_client::ReplicaId;
24use mz_cluster_client::client::ClusterReplicaLocation;
25use mz_ore::cast::CastFrom;
26use mz_ore::now::NowFn;
27use mz_ore::retry::{Retry, RetryState};
28use mz_ore::task::AbortOnDropHandle;
29use mz_repr::GlobalId;
30use mz_service::client::{GenericClient, Partitioned};
31use mz_service::params::GrpcClientParameters;
32use mz_service::transport;
33use mz_storage_client::client::{
34 RunIngestionCommand, RunSinkCommand, Status, StatusUpdate, StorageClient, StorageCommand,
35 StorageGrpcClient, StorageResponse,
36};
37use mz_storage_client::metrics::{InstanceMetrics, ReplicaMetrics};
38use mz_storage_types::sinks::StorageSinkDesc;
39use mz_storage_types::sources::{IngestionDescription, SourceConnection};
40use timely::order::TotalOrder;
41use timely::progress::{Antichain, Timestamp};
42use tokio::select;
43use tokio::sync::mpsc;
44use tracing::{debug, info, warn};
45use uuid::Uuid;
46
47use crate::history::CommandHistory;
48
49#[derive(Debug)]
59pub(crate) struct Instance<T> {
60 pub workload_class: Option<String>,
64 replicas: BTreeMap<ReplicaId, Replica<T>>,
66 active_ingestions: BTreeMap<GlobalId, ActiveIngestion>,
72 ingestion_exports: BTreeMap<GlobalId, GlobalId>,
74 active_exports: BTreeMap<GlobalId, ActiveExport>,
80 history: CommandHistory<T>,
83 metrics: InstanceMetrics,
85 now: NowFn,
87 response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
93}
94
95#[derive(Debug)]
96struct ActiveIngestion {
97 active_replicas: BTreeSet<ReplicaId>,
99}
100
101#[derive(Debug)]
102struct ActiveExport {
103 active_replicas: BTreeSet<ReplicaId>,
105}
106
107impl<T> Instance<T>
108where
109 T: Timestamp + Lattice + TotalOrder + Sync,
110 StorageGrpcClient: StorageClient<T>,
111{
112 pub fn new(
114 workload_class: Option<String>,
115 metrics: InstanceMetrics,
116 now: NowFn,
117 instance_response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
118 ) -> Self {
119 let history = CommandHistory::new(metrics.for_history());
120
121 let mut instance = Self {
122 workload_class,
123 replicas: Default::default(),
124 active_ingestions: Default::default(),
125 ingestion_exports: Default::default(),
126 active_exports: BTreeMap::new(),
127 history,
128 metrics,
129 now,
130 response_tx: instance_response_tx,
131 };
132
133 instance.send(StorageCommand::Hello {
134 nonce: Default::default(),
137 });
138
139 instance
140 }
141
142 pub fn replica_ids(&self) -> impl Iterator<Item = ReplicaId> + '_ {
144 self.replicas.keys().copied()
145 }
146
147 pub fn add_replica(&mut self, id: ReplicaId, config: ReplicaConfig) {
149 self.history.reduce();
152
153 let metrics = self.metrics.for_replica(id);
154 let replica = Replica::new(id, config, metrics, self.response_tx.clone());
155
156 self.replicas.insert(id, replica);
157
158 self.update_scheduling(false);
159
160 self.replay_commands(id);
161 }
162
163 pub fn replay_commands(&mut self, replica_id: ReplicaId) {
165 let commands = self.history.iter().cloned();
166
167 let filtered_commands = commands
168 .filter_map(|command| match command {
169 StorageCommand::RunIngestion(ingestion) => {
170 if self.is_active_replica(&ingestion.id, &replica_id) {
171 Some(StorageCommand::RunIngestion(ingestion))
172 } else {
173 None
174 }
175 }
176 StorageCommand::RunSink(sink) => {
177 if self.is_active_replica(&sink.id, &replica_id) {
178 Some(StorageCommand::RunSink(sink))
179 } else {
180 None
181 }
182 }
183 StorageCommand::AllowCompaction(id, upper) => {
184 if self.is_active_replica(&id, &replica_id) {
185 Some(StorageCommand::AllowCompaction(id, upper))
186 } else {
187 None
188 }
189 }
190 command => Some(command),
191 })
192 .collect::<Vec<_>>();
193
194 let replica = self
195 .replicas
196 .get_mut(&replica_id)
197 .expect("replica must exist");
198
199 for command in filtered_commands {
201 replica.send(command);
202 }
203 }
204
205 pub fn drop_replica(&mut self, id: ReplicaId) {
207 let replica = self.replicas.remove(&id);
208
209 let mut needs_rescheduling = false;
210 for (ingestion_id, ingestion) in self.active_ingestions.iter_mut() {
211 let was_running = ingestion.active_replicas.remove(&id);
212 if was_running {
213 tracing::debug!(
214 %ingestion_id,
215 replica_id = %id,
216 "ingestion was running on dropped replica, updating scheduling decisions"
217 );
218 needs_rescheduling = true;
219 }
220 }
221 for (export_id, export) in self.active_exports.iter_mut() {
222 let was_running = export.active_replicas.remove(&id);
223 if was_running {
224 tracing::debug!(
225 %export_id,
226 replica_id = %id,
227 "export was running on dropped replica, updating scheduling decisions"
228 );
229 needs_rescheduling = true;
230 }
231 }
232
233 tracing::info!(%id, %needs_rescheduling, "dropped replica");
234
235 if needs_rescheduling {
236 self.update_scheduling(true);
237 }
238
239 if replica.is_some() && self.replicas.is_empty() {
240 self.update_paused_statuses();
241 }
242 }
243
244 pub fn rehydrate_failed_replicas(&mut self) {
246 let replicas = self.replicas.iter();
247 let failed_replicas: Vec<_> = replicas
248 .filter_map(|(id, replica)| replica.failed().then_some(*id))
249 .collect();
250
251 for id in failed_replicas {
252 let replica = self.replicas.remove(&id).expect("must exist");
253 self.add_replica(id, replica.config);
254 }
255 }
256
257 pub fn active_ingestions(&self) -> impl Iterator<Item = &GlobalId> {
259 self.active_ingestions.keys()
260 }
261
262 pub fn active_exports(&self) -> impl Iterator<Item = &GlobalId> {
264 self.active_exports.keys()
265 }
266
267 fn update_paused_statuses(&mut self) {
269 let now = mz_ore::now::to_datetime((self.now)());
270 let make_update = |id, object_type| StatusUpdate {
271 id,
272 status: Status::Paused,
273 timestamp: now,
274 error: None,
275 hints: BTreeSet::from([format!(
276 "There is currently no replica running this {object_type}"
277 )]),
278 namespaced_errors: Default::default(),
279 replica_id: None,
280 };
281
282 self.history.reduce();
283
284 let mut status_updates = Vec::new();
285 for command in self.history.iter() {
286 match command {
287 StorageCommand::RunIngestion(ingestion) => {
288 let subsource_ids = ingestion
298 .description
299 .collection_ids()
300 .filter(|id| id != &ingestion.description.remap_collection_id);
301 for id in subsource_ids {
302 status_updates.push(make_update(id, "source"));
303 }
304 }
305 StorageCommand::RunSink(sink) => {
306 status_updates.push(make_update(sink.id, "sink"));
307 }
308 _ => (),
309 }
310 }
311
312 for update in status_updates {
313 let _ = self
317 .response_tx
318 .send((None, StorageResponse::StatusUpdate(update)));
319 }
320 }
321
322 pub fn send(&mut self, command: StorageCommand<T>) {
324 self.history.push(command.clone());
326
327 match command.clone() {
328 StorageCommand::RunIngestion(ingestion) => {
329 self.absorb_ingestion(*ingestion.clone());
333
334 for replica in self.active_replicas(&ingestion.id) {
335 replica.send(StorageCommand::RunIngestion(ingestion.clone()));
336 }
337 }
338 StorageCommand::RunSink(sink) => {
339 self.absorb_export(*sink.clone());
343
344 for replica in self.active_replicas(&sink.id) {
345 replica.send(StorageCommand::RunSink(sink.clone()));
346 }
347 }
348 StorageCommand::AllowCompaction(id, frontier) => {
349 for replica in self.active_replicas(&id) {
352 replica.send(StorageCommand::AllowCompaction(
353 id.clone(),
354 frontier.clone(),
355 ));
356 }
357
358 self.absorb_compaction(id, frontier);
359 }
360 command => {
361 for replica in self.replicas.values_mut() {
362 replica.send(command.clone());
363 }
364 }
365 }
366
367 if command.installs_objects() && self.replicas.is_empty() {
368 self.update_paused_statuses();
369 }
370 }
371
372 fn absorb_ingestion(&mut self, ingestion: RunIngestionCommand) {
377 let existing_ingestion_state = self.active_ingestions.get_mut(&ingestion.id);
378
379 for id in ingestion.description.source_exports.keys() {
381 self.ingestion_exports.insert(id.clone(), ingestion.id);
382 }
383
384 if let Some(ingestion_state) = existing_ingestion_state {
385 tracing::debug!(
390 ingestion_id = %ingestion.id,
391 active_replicas = %ingestion_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
392 "updating ingestion"
393 );
394 } else {
395 let ingestion_state = ActiveIngestion {
397 active_replicas: BTreeSet::new(),
398 };
399 self.active_ingestions.insert(ingestion.id, ingestion_state);
400
401 self.update_scheduling(false);
403 }
404 }
405
406 fn absorb_export(&mut self, export: RunSinkCommand<T>) {
411 let existing_export_state = self.active_exports.get_mut(&export.id);
412
413 if let Some(export_state) = existing_export_state {
414 tracing::debug!(
419 export_id = %export.id,
420 active_replicas = %export_state.active_replicas.iter().map(|id| id.to_string()).join(", "),
421 "updating export"
422 );
423 } else {
424 let export_state = ActiveExport {
426 active_replicas: BTreeSet::new(),
427 };
428 self.active_exports.insert(export.id, export_state);
429
430 self.update_scheduling(false);
432 }
433 }
434
435 fn update_scheduling(&mut self, send_commands: bool) {
454 #[derive(Debug)]
455 enum ObjectId {
456 Ingestion(GlobalId),
457 Export(GlobalId),
458 }
459 let mut scheduling_preferences: Vec<(ObjectId, bool)> = Vec::new();
464
465 for ingestion_id in self.active_ingestions.keys() {
466 let ingestion_description = self
467 .get_ingestion_description(ingestion_id)
468 .expect("missing ingestion description");
469
470 let prefers_single_replica = ingestion_description
471 .desc
472 .connection
473 .prefers_single_replica();
474
475 scheduling_preferences
476 .push((ObjectId::Ingestion(*ingestion_id), prefers_single_replica));
477 }
478
479 for export_id in self.active_exports.keys() {
480 scheduling_preferences.push((ObjectId::Export(*export_id), true));
482 }
483
484 let mut commands_by_replica: BTreeMap<ReplicaId, Vec<ObjectId>> = BTreeMap::new();
486
487 for (object_id, prefers_single_replica) in scheduling_preferences {
488 let active_replicas = match object_id {
489 ObjectId::Ingestion(ingestion_id) => {
490 &mut self
491 .active_ingestions
492 .get_mut(&ingestion_id)
493 .expect("missing ingestion state")
494 .active_replicas
495 }
496 ObjectId::Export(export_id) => {
497 &mut self
498 .active_exports
499 .get_mut(&export_id)
500 .expect("missing ingestion state")
501 .active_replicas
502 }
503 };
504
505 if prefers_single_replica {
506 if active_replicas.is_empty() {
508 let target_replica = self.replicas.keys().min().copied();
509 if let Some(first_replica_id) = target_replica {
510 tracing::info!(
511 object_id = ?object_id,
512 replica_id = %first_replica_id,
513 "scheduling single-replica object");
514 active_replicas.insert(first_replica_id);
515
516 commands_by_replica
517 .entry(first_replica_id)
518 .or_default()
519 .push(object_id);
520 }
521 } else {
522 tracing::info!(
523 ?object_id,
524 active_replicas = %active_replicas.iter().map(|id| id.to_string()).join(", "),
525 "single-replica object already running, not scheduling again",
526 );
527 }
528 } else {
529 let current_replica_ids: BTreeSet<_> = self.replicas.keys().copied().collect();
530 let unscheduled_replicas: Vec<_> = current_replica_ids
531 .difference(active_replicas)
532 .copied()
533 .collect();
534 for replica_id in unscheduled_replicas {
535 tracing::info!(
536 ?object_id,
537 %replica_id,
538 "scheduling multi-replica object"
539 );
540 active_replicas.insert(replica_id);
541 }
542 }
543 }
544
545 if send_commands {
546 for (replica_id, object_ids) in commands_by_replica {
547 let mut ingestion_commands = vec![];
548 let mut export_commands = vec![];
549 for object_id in object_ids {
550 match object_id {
551 ObjectId::Ingestion(id) => {
552 ingestion_commands.push(RunIngestionCommand {
553 id,
554 description: self
555 .get_ingestion_description(&id)
556 .expect("missing ingestion description")
557 .clone(),
558 });
559 }
560 ObjectId::Export(id) => {
561 export_commands.push(RunSinkCommand {
562 id,
563 description: self
564 .get_export_description(&id)
565 .expect("missing export description")
566 .clone(),
567 });
568 }
569 }
570 }
571 for ingestion in ingestion_commands {
572 let replica = self.replicas.get_mut(&replica_id).expect("missing replica");
573 let ingestion = Box::new(ingestion);
574 replica.send(StorageCommand::RunIngestion(ingestion));
575 }
576 for export in export_commands {
577 let replica = self.replicas.get_mut(&replica_id).expect("missing replica");
578 let export = Box::new(export);
579 replica.send(StorageCommand::RunSink(export));
580 }
581 }
582 }
583 }
584
585 pub fn get_ingestion_description(
593 &self,
594 id: &GlobalId,
595 ) -> Option<IngestionDescription<CollectionMetadata>> {
596 if !self.active_ingestions.contains_key(id) {
597 return None;
598 }
599
600 self.history.iter().rev().find_map(|command| {
601 if let StorageCommand::RunIngestion(ingestion) = command {
602 if &ingestion.id == id {
603 Some(ingestion.description.clone())
604 } else {
605 None
606 }
607 } else {
608 None
609 }
610 })
611 }
612
613 pub fn get_export_description(
621 &self,
622 id: &GlobalId,
623 ) -> Option<StorageSinkDesc<CollectionMetadata, T>> {
624 if !self.active_exports.contains_key(id) {
625 return None;
626 }
627
628 self.history.iter().rev().find_map(|command| {
629 if let StorageCommand::RunSink(sink) = command {
630 if &sink.id == id {
631 Some(sink.description.clone())
632 } else {
633 None
634 }
635 } else {
636 None
637 }
638 })
639 }
640
641 fn absorb_compaction(&mut self, id: GlobalId, frontier: Antichain<T>) {
643 tracing::debug!(?self.active_ingestions, ?id, ?frontier, "allow_compaction");
644
645 if frontier.is_empty() {
646 self.active_ingestions.remove(&id);
647 self.ingestion_exports.remove(&id);
648 self.active_exports.remove(&id);
649 }
650 }
651
652 fn active_replicas(&mut self, id: &GlobalId) -> Box<dyn Iterator<Item = &mut Replica<T>> + '_> {
654 if let Some(ingestion_id) = self.ingestion_exports.get(id) {
655 match self.active_ingestions.get(ingestion_id) {
656 Some(ingestion) => Box::new(self.replicas.iter_mut().filter_map(
657 move |(replica_id, replica)| {
658 if ingestion.active_replicas.contains(replica_id) {
659 Some(replica)
660 } else {
661 None
662 }
663 },
664 )),
665 None => {
666 Box::new(std::iter::empty())
668 }
669 }
670 } else if let Some(export) = self.active_exports.get(id) {
671 Box::new(
672 self.replicas
673 .iter_mut()
674 .filter_map(move |(replica_id, replica)| {
675 if export.active_replicas.contains(replica_id) {
676 Some(replica)
677 } else {
678 None
679 }
680 }),
681 )
682 } else {
683 Box::new(self.replicas.values_mut())
684 }
685 }
686
687 fn is_active_replica(&self, id: &GlobalId, replica_id: &ReplicaId) -> bool {
689 if let Some(ingestion_id) = self.ingestion_exports.get(id) {
690 match self.active_ingestions.get(ingestion_id) {
691 Some(ingestion) => ingestion.active_replicas.contains(replica_id),
692 None => {
693 false
695 }
696 }
697 } else if let Some(export) = self.active_exports.get(id) {
698 export.active_replicas.contains(replica_id)
699 } else {
700 true
702 }
703 }
704
705 pub(super) fn refresh_state_metrics(&self) {
714 let connected_replica_count = self.replicas.values().filter(|r| r.is_connected()).count();
715
716 self.metrics
717 .connected_replica_count
718 .set(u64::cast_from(connected_replica_count));
719 }
720
721 pub fn get_active_replicas_for_object(&self, id: &GlobalId) -> BTreeSet<ReplicaId> {
724 if let Some(ingestion_id) = self.ingestion_exports.get(id) {
725 match self.active_ingestions.get(ingestion_id) {
727 Some(ingestion) => ingestion.active_replicas.clone(),
728 None => {
729 BTreeSet::new()
731 }
732 }
733 } else {
734 self.replicas.keys().copied().collect()
736 }
737 }
738}
739
740#[derive(Clone, Debug)]
742pub(super) struct ReplicaConfig {
743 pub build_info: &'static BuildInfo,
744 pub location: ClusterReplicaLocation,
745 pub grpc_client: GrpcClientParameters,
746 pub enable_ctp: bool,
747}
748
749#[derive(Debug)]
751pub struct Replica<T> {
752 config: ReplicaConfig,
754 command_tx: mpsc::UnboundedSender<StorageCommand<T>>,
759 task: AbortOnDropHandle<()>,
761 connected: Arc<AtomicBool>,
763}
764
765impl<T> Replica<T>
766where
767 T: Timestamp + Lattice + Sync,
768 StorageGrpcClient: StorageClient<T>,
769{
770 fn new(
772 id: ReplicaId,
773 config: ReplicaConfig,
774 metrics: ReplicaMetrics,
775 response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
776 ) -> Self {
777 let (command_tx, command_rx) = mpsc::unbounded_channel();
778 let connected = Arc::new(AtomicBool::new(false));
779
780 let task = mz_ore::task::spawn(
781 || "storage-replica-{id}",
782 ReplicaTask {
783 replica_id: id,
784 config: config.clone(),
785 metrics: metrics.clone(),
786 connected: Arc::clone(&connected),
787 command_rx,
788 response_tx,
789 }
790 .run(),
791 );
792
793 Self {
794 config,
795 command_tx,
796 task: task.abort_on_drop(),
797 connected,
798 }
799 }
800
801 fn send(&self, command: StorageCommand<T>) {
803 let _ = self.command_tx.send(command);
805 }
806
807 fn failed(&self) -> bool {
810 self.task.is_finished()
811 }
812
813 pub(super) fn is_connected(&self) -> bool {
815 self.connected.load(atomic::Ordering::Relaxed)
816 }
817}
818
819type StorageCtpClient<T> = transport::Client<StorageCommand<T>, StorageResponse<T>>;
820
821#[derive(Debug)]
822enum ReplicaClient<T: Timestamp + Lattice> {
823 Grpc(Partitioned<StorageGrpcClient, StorageCommand<T>, StorageResponse<T>>),
824 Ctp(Partitioned<StorageCtpClient<T>, StorageCommand<T>, StorageResponse<T>>),
825}
826
827#[async_trait]
828impl<T> GenericClient<StorageCommand<T>, StorageResponse<T>> for ReplicaClient<T>
829where
830 T: Timestamp + Lattice + Sync,
831 StorageGrpcClient: StorageClient<T>,
832{
833 async fn send(&mut self, cmd: StorageCommand<T>) -> anyhow::Result<()> {
834 match self {
835 Self::Grpc(client) => client.send(cmd).await,
836 Self::Ctp(client) => client.send(cmd).await,
837 }
838 }
839
840 async fn recv(&mut self) -> anyhow::Result<Option<StorageResponse<T>>> {
846 match self {
847 Self::Grpc(client) => client.recv().await,
849 Self::Ctp(client) => client.recv().await,
851 }
852 }
853}
854
855struct ReplicaTask<T> {
857 replica_id: ReplicaId,
859 config: ReplicaConfig,
861 metrics: ReplicaMetrics,
863 connected: Arc<AtomicBool>,
865 command_rx: mpsc::UnboundedReceiver<StorageCommand<T>>,
867 response_tx: mpsc::UnboundedSender<(Option<ReplicaId>, StorageResponse<T>)>,
869}
870
871impl<T> ReplicaTask<T>
872where
873 T: Timestamp + Lattice + Sync,
874 StorageGrpcClient: StorageClient<T>,
875{
876 async fn run(self) {
878 let replica_id = self.replica_id;
879 info!(%replica_id, "starting replica task");
880
881 let client = self.connect().await;
882 match self.run_message_loop(client).await {
883 Ok(()) => info!(%replica_id, "stopped replica task"),
884 Err(error) => warn!(%replica_id, %error, "replica task failed"),
885 }
886 }
887
888 async fn connect(&self) -> ReplicaClient<T> {
893 let try_connect = async move |retry: RetryState| {
894 let version = self.config.build_info.semver_version();
895 let client_params = &self.config.grpc_client;
896
897 let connect_start = Instant::now();
898 let connect_result = if self.config.enable_ctp {
899 let connect_timeout = client_params.connect_timeout.unwrap_or(Duration::MAX);
900 let keepalive_timeout = client_params
901 .http2_keep_alive_timeout
902 .unwrap_or(Duration::MAX);
903
904 StorageCtpClient::<T>::connect_partitioned(
905 self.config.location.ctl_addrs.clone(),
906 version,
907 connect_timeout,
908 keepalive_timeout,
909 self.metrics.clone(),
910 )
911 .await
912 .map(ReplicaClient::Ctp)
913 } else {
914 let addrs = &self.config.location.ctl_addrs;
915 let dests = addrs
916 .iter()
917 .map(|addr| (addr.clone(), self.metrics.clone()))
918 .collect();
919 StorageGrpcClient::connect_partitioned(dests, version, client_params)
920 .await
921 .map(ReplicaClient::Grpc)
922 };
923
924 self.metrics.observe_connect_time(connect_start.elapsed());
925
926 connect_result.inspect_err(|error| {
927 let next_backoff = retry.next_backoff.unwrap();
928 if retry.i >= mz_service::retry::INFO_MIN_RETRIES {
929 info!(
930 replica_id = %self.replica_id, ?next_backoff,
931 "error connecting to replica: {error:#}",
932 );
933 } else {
934 debug!(
935 replica_id = %self.replica_id, ?next_backoff,
936 "error connecting to replica: {error:#}",
937 );
938 }
939 })
940 };
941
942 let client = Retry::default()
943 .clamp_backoff(Duration::from_secs(1))
944 .retry_async(try_connect)
945 .await
946 .expect("retries forever");
947
948 self.metrics.observe_connect();
949 self.connected.store(true, atomic::Ordering::Relaxed);
950
951 client
952 }
953
954 async fn run_message_loop(mut self, mut client: ReplicaClient<T>) -> Result<(), anyhow::Error> {
960 loop {
961 select! {
962 command = self.command_rx.recv() => {
965 let Some(mut command) = command else {
966 tracing::debug!(%self.replica_id, "controller is no longer interested in this replica, shutting down message loop");
967 break;
968 };
969
970 self.specialize_command(&mut command);
971 client.send(command).await?;
972 },
973 response = client.recv() => {
976 let Some(response) = response? else {
977 bail!("replica unexpectedly gracefully terminated connection");
978 };
979
980 if self.response_tx.send((Some(self.replica_id), response)).is_err() {
981 tracing::debug!(%self.replica_id, "controller (receiver) is no longer interested in this replica, shutting down message loop");
982 break;
983 }
984 }
985 }
986 }
987
988 Ok(())
989 }
990
991 fn specialize_command(&self, command: &mut StorageCommand<T>) {
996 if let StorageCommand::Hello { nonce } = command {
997 *nonce = Uuid::new_v4();
998 }
999 }
1000}