1use std::cell::RefCell;
13use std::collections::btree_map::Entry;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::fmt::Debug;
17use std::rc::Rc;
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use differential_dataflow::Hashable;
22use futures::StreamExt;
23use mz_ore::cast::CastFrom;
24use mz_ore::now::NowFn;
25use mz_repr::GlobalId;
26use mz_storage_client::client::{Status, StatusUpdate};
27use mz_timely_util::builder_async::{
28 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
29};
30use timely::dataflow::channels::pact::Exchange;
31use timely::dataflow::operators::Map;
32use timely::dataflow::{Scope, Stream};
33use tracing::{error, info};
34
35use crate::internal_control::{InternalCommandSender, InternalStorageCommand};
36
37#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
45pub enum StatusNamespace {
46 Generator,
49 Kafka,
50 Postgres,
51 MySql,
52 SqlServer,
53 Ssh,
54 Upsert,
55 Decode,
56 Internal,
57}
58
59impl StatusNamespace {
60 fn is_sidechannel(&self) -> bool {
61 matches!(self, StatusNamespace::Ssh)
62 }
63}
64
65impl fmt::Display for StatusNamespace {
66 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67 use StatusNamespace::*;
68 match self {
69 Generator => write!(f, "generator"),
70 Kafka => write!(f, "kafka"),
71 Postgres => write!(f, "postgres"),
72 MySql => write!(f, "mysql"),
73 SqlServer => write!(f, "sql-server"),
74 Ssh => write!(f, "ssh"),
75 Upsert => write!(f, "upsert"),
76 Decode => write!(f, "decode"),
77 Internal => write!(f, "internal"),
78 }
79 }
80}
81
82#[derive(Debug)]
83struct PerWorkerHealthStatus {
84 pub(crate) errors_by_worker: Vec<BTreeMap<StatusNamespace, HealthStatusUpdate>>,
85}
86
87impl PerWorkerHealthStatus {
88 fn merge_update(
89 &mut self,
90 worker: usize,
91 namespace: StatusNamespace,
92 update: HealthStatusUpdate,
93 only_greater: bool,
94 ) {
95 let errors = &mut self.errors_by_worker[worker];
96 match errors.entry(namespace) {
97 Entry::Vacant(v) => {
98 v.insert(update);
99 }
100 Entry::Occupied(mut o) => {
101 if !only_greater || o.get() < &update {
102 o.insert(update);
103 }
104 }
105 }
106 }
107
108 fn decide_status(&self) -> OverallStatus {
109 let mut output_status = OverallStatus::Starting;
110 let mut namespaced_errors: BTreeMap<StatusNamespace, String> = BTreeMap::new();
111 let mut hints: BTreeSet<String> = BTreeSet::new();
112
113 for status in self.errors_by_worker.iter() {
114 for (ns, ns_status) in status.iter() {
115 match ns_status {
116 HealthStatusUpdate::Ceased { error } => {
121 if Some(error) > namespaced_errors.get(ns).as_deref() {
122 namespaced_errors.insert(*ns, error.to_string());
123 }
124 }
125 HealthStatusUpdate::Stalled { error, hint, .. } => {
126 if Some(error) > namespaced_errors.get(ns).as_deref() {
127 namespaced_errors.insert(*ns, error.to_string());
128 }
129
130 if let Some(hint) = hint {
131 hints.insert(hint.to_string());
132 }
133 }
134 HealthStatusUpdate::Running => {
135 if !ns.is_sidechannel() {
136 output_status = OverallStatus::Running;
137 }
138 }
139 }
140 }
141 }
142
143 if !namespaced_errors.is_empty() {
144 let (ns, err) = namespaced_errors.last_key_value().unwrap();
146 output_status = OverallStatus::Stalled {
147 error: format!("{}: {}", ns, err),
148 hints,
149 namespaced_errors,
150 }
151 }
152
153 output_status
154 }
155}
156
157#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
158pub enum OverallStatus {
159 Starting,
160 Running,
161 Stalled {
162 error: String,
163 hints: BTreeSet<String>,
164 namespaced_errors: BTreeMap<StatusNamespace, String>,
165 },
166 Ceased {
167 error: String,
168 },
169}
170
171impl OverallStatus {
172 pub(crate) fn error(&self) -> Option<&str> {
174 match self {
175 OverallStatus::Starting | OverallStatus::Running => None,
176 OverallStatus::Stalled { error, .. } | OverallStatus::Ceased { error, .. } => {
177 Some(error)
178 }
179 }
180 }
181
182 pub(crate) fn errors(&self) -> Option<&BTreeMap<StatusNamespace, String>> {
184 match self {
185 OverallStatus::Starting | OverallStatus::Running | OverallStatus::Ceased { .. } => None,
186 OverallStatus::Stalled {
187 namespaced_errors, ..
188 } => Some(namespaced_errors),
189 }
190 }
191
192 pub(crate) fn hints(&self) -> BTreeSet<String> {
194 match self {
195 OverallStatus::Starting | OverallStatus::Running | OverallStatus::Ceased { .. } => {
196 BTreeSet::new()
197 }
198 OverallStatus::Stalled { hints, .. } => hints.clone(),
199 }
200 }
201}
202
203impl<'a> From<&'a OverallStatus> for Status {
204 fn from(val: &'a OverallStatus) -> Self {
205 match val {
206 OverallStatus::Starting => Status::Starting,
207 OverallStatus::Running => Status::Running,
208 OverallStatus::Stalled { .. } => Status::Stalled,
209 OverallStatus::Ceased { .. } => Status::Ceased,
210 }
211 }
212}
213
214#[derive(Debug)]
215struct HealthState {
216 healths: PerWorkerHealthStatus,
217 last_reported_status: Option<OverallStatus>,
218 halt_with: Option<(StatusNamespace, HealthStatusUpdate)>,
219}
220
221impl HealthState {
222 fn new(worker_count: usize) -> HealthState {
223 HealthState {
224 healths: PerWorkerHealthStatus {
225 errors_by_worker: vec![Default::default(); worker_count],
226 },
227 last_reported_status: None,
228 halt_with: None,
229 }
230 }
231}
232
233pub trait HealthOperator {
237 fn record_new_status(
239 &self,
240 collection_id: GlobalId,
241 ts: DateTime<Utc>,
242 new_status: Status,
243 new_error: Option<&str>,
244 hints: &BTreeSet<String>,
245 namespaced_errors: &BTreeMap<StatusNamespace, String>,
246 write_namespaced_map: bool,
251 );
252 fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>);
253
254 fn chosen_worker(&self) -> Option<usize> {
257 None
258 }
259}
260
261pub struct DefaultWriter {
263 pub command_tx: InternalCommandSender,
264 pub updates: Rc<RefCell<Vec<StatusUpdate>>>,
265}
266
267impl HealthOperator for DefaultWriter {
268 fn record_new_status(
269 &self,
270 collection_id: GlobalId,
271 ts: DateTime<Utc>,
272 status: Status,
273 new_error: Option<&str>,
274 hints: &BTreeSet<String>,
275 namespaced_errors: &BTreeMap<StatusNamespace, String>,
276 write_namespaced_map: bool,
277 ) {
278 self.updates.borrow_mut().push(StatusUpdate {
279 id: collection_id,
280 timestamp: ts,
281 status,
282 error: new_error.map(|e| e.to_string()),
283 hints: hints.clone(),
284 namespaced_errors: if write_namespaced_map {
285 namespaced_errors
286 .iter()
287 .map(|(ns, val)| (ns.to_string(), val.clone()))
288 .collect()
289 } else {
290 BTreeMap::new()
291 },
292 replica_id: None,
293 });
294 }
295
296 fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>) {
297 self.command_tx
298 .send(InternalStorageCommand::SuspendAndRestart {
299 id,
302 reason: format!("{:?}", error),
303 });
304 }
305}
306
307#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
309pub struct HealthStatusMessage {
310 pub id: Option<GlobalId>,
313 pub namespace: StatusNamespace,
315 pub update: HealthStatusUpdate,
317}
318
319pub(crate) fn health_operator<G, P>(
327 scope: &G,
328 now: NowFn,
329 mark_starting: BTreeSet<GlobalId>,
331 halting_id: GlobalId,
334 object_type: &'static str,
336 health_stream: &Stream<G, HealthStatusMessage>,
338 health_operator_impl: P,
340 write_namespaced_map: bool,
342 suspend_and_restart_delay: Duration,
345) -> PressOnDropButton
346where
347 G: Scope,
348 P: HealthOperator + 'static,
349{
350 let healthcheck_worker_id = scope.index();
352 let worker_count = scope.peers();
353
354 let health_stream = health_stream.map(move |status| (healthcheck_worker_id, status));
356
357 let chosen_worker_id = if let Some(index) = health_operator_impl.chosen_worker() {
358 index
359 } else {
360 usize::cast_from(mark_starting.iter().next().hashed()) % worker_count
363 };
364
365 let is_active_worker = chosen_worker_id == healthcheck_worker_id;
366
367 let operator_name = format!("healthcheck({})", healthcheck_worker_id);
368 let mut health_op = AsyncOperatorBuilder::new(operator_name, scope.clone());
369
370 let mut input = health_op.new_disconnected_input(
371 &health_stream,
372 Exchange::new(move |_| u64::cast_from(chosen_worker_id)),
373 );
374
375 let button = health_op.build(move |mut _capabilities| async move {
376 let mut health_states: BTreeMap<_, _> = mark_starting
377 .iter()
378 .copied()
379 .chain([halting_id])
380 .map(|id| (id, HealthState::new(worker_count)))
381 .collect();
382
383 if is_active_worker {
385 for (id, state) in health_states.iter_mut() {
386 let status = OverallStatus::Starting;
387 let timestamp = mz_ore::now::to_datetime(now());
388 health_operator_impl.record_new_status(
389 *id,
390 timestamp,
391 (&status).into(),
392 status.error(),
393 &status.hints(),
394 status.errors().unwrap_or(&BTreeMap::new()),
395 write_namespaced_map,
396 );
397
398 state.last_reported_status = Some(status);
399 }
400 }
401
402 let mut outputs_seen = BTreeMap::<GlobalId, BTreeSet<_>>::new();
403 while let Some(event) = input.next().await {
404 if let AsyncEvent::Data(_cap, rows) = event {
405 for (worker_id, message) in rows {
406 let HealthStatusMessage {
407 id,
408 namespace: ns,
409 update: health_event,
410 } = message;
411 let id = id.unwrap_or(halting_id);
412 let HealthState {
413 healths, halt_with, ..
414 } = match health_states.get_mut(&id) {
415 Some(health) => health,
416 None => continue,
420 };
421
422 let new_round = outputs_seen
425 .entry(id)
426 .or_insert_with(BTreeSet::new)
427 .insert(ns.clone());
428
429 if !is_active_worker {
430 error!(
431 "Health messages for {object_type} {id} passed to \
432 an unexpected worker id: {healthcheck_worker_id}"
433 )
434 }
435
436 if health_event.should_halt() {
437 *halt_with = Some((ns.clone(), health_event.clone()));
438 }
439
440 healths.merge_update(worker_id, ns, health_event, !new_round);
441 }
442
443 let mut halt_with_outer = None;
444
445 while let Some((id, _)) = outputs_seen.pop_first() {
446 let HealthState {
447 healths,
448 last_reported_status,
449 halt_with,
450 } = health_states.get_mut(&id).expect("known to exist");
451
452 let new_status = healths.decide_status();
453
454 if Some(&new_status) != last_reported_status.as_ref() {
455 info!(
456 "Health transition for {object_type} {id}: \
457 {last_reported_status:?} -> {:?}",
458 Some(&new_status)
459 );
460
461 let timestamp = mz_ore::now::to_datetime(now());
462 health_operator_impl.record_new_status(
463 id,
464 timestamp,
465 (&new_status).into(),
466 new_status.error(),
467 &new_status.hints(),
468 new_status.errors().unwrap_or(&BTreeMap::new()),
469 write_namespaced_map,
470 );
471
472 *last_reported_status = Some(new_status.clone());
473 }
474
475 if halt_with_outer.is_none() && halt_with.is_some() {
477 halt_with_outer = Some((id, halt_with.clone()));
478 }
479 }
480
481 if let Some((id, halt_with)) = halt_with_outer {
486 mz_ore::soft_assert_or_log!(
487 id == halting_id,
488 "sub{object_type}s should not produce \
489 halting errors, however {:?} halted while primary \
490 {object_type} is {:?}",
491 id,
492 halting_id
493 );
494
495 info!(
496 "Broadcasting suspend-and-restart \
497 command because of {:?} after {:?} delay",
498 halt_with, suspend_and_restart_delay
499 );
500 tokio::time::sleep(suspend_and_restart_delay).await;
501 health_operator_impl.send_halt(id, halt_with);
502 }
503 }
504 }
505 });
506
507 button.press_on_drop()
508}
509
510use serde::{Deserialize, Serialize};
511
512#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
516pub enum HealthStatusUpdate {
517 Running,
518 Stalled {
519 error: String,
520 hint: Option<String>,
521 should_halt: bool,
522 },
523 Ceased {
524 error: String,
525 },
526}
527
528impl HealthStatusUpdate {
529 pub(crate) fn running() -> Self {
531 HealthStatusUpdate::Running
532 }
533
534 pub(crate) fn stalled(error: String, hint: Option<String>) -> Self {
536 HealthStatusUpdate::Stalled {
537 error,
538 hint,
539 should_halt: false,
540 }
541 }
542
543 pub(crate) fn halting(error: String, hint: Option<String>) -> Self {
545 HealthStatusUpdate::Stalled {
546 error,
547 hint,
548 should_halt: true,
549 }
550 }
551
552 pub(crate) fn should_halt(&self) -> bool {
560 match self {
561 HealthStatusUpdate::Running |
562 HealthStatusUpdate::Ceased { .. } => false,
567 HealthStatusUpdate::Stalled { should_halt, .. } => *should_halt,
568 }
569 }
570}
571
572#[cfg(test)]
573mod tests {
574 use super::*;
575 use itertools::Itertools;
576
577 #[mz_ore::test]
580 #[cfg_attr(miri, ignore)] fn test_health_operator_basic() {
582 use Step::*;
583
584 health_operator_runner(
586 2,
587 2,
588 true,
589 vec![
590 AssertStatus(vec![
591 StatusToAssert {
593 collection_index: 0,
594 status: Status::Starting,
595 ..Default::default()
596 },
597 StatusToAssert {
598 collection_index: 1,
599 status: Status::Starting,
600 ..Default::default()
601 },
602 ]),
603 Update(TestUpdate {
605 worker_id: 1,
606 namespace: StatusNamespace::Generator,
607 id: None,
608 update: HealthStatusUpdate::running(),
609 }),
610 AssertStatus(vec![StatusToAssert {
611 collection_index: 0,
612 status: Status::Running,
613 ..Default::default()
614 }]),
615 Update(TestUpdate {
622 worker_id: 1,
623 namespace: StatusNamespace::Generator,
624 id: Some(GlobalId::User(1)),
625 update: HealthStatusUpdate::running(),
626 }),
627 AssertStatus(vec![StatusToAssert {
628 collection_index: 1,
629 status: Status::Running,
630 ..Default::default()
631 }]),
632 Update(TestUpdate {
633 worker_id: 0,
634 namespace: StatusNamespace::Generator,
635 id: Some(GlobalId::User(1)),
636 update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
637 }),
638 AssertStatus(vec![StatusToAssert {
639 collection_index: 1,
640 status: Status::Stalled,
641 error: Some("generator: uhoh".to_string()),
642 errors: Some("generator: uhoh".to_string()),
643 ..Default::default()
644 }]),
645 Update(TestUpdate {
647 worker_id: 0,
648 namespace: StatusNamespace::Generator,
649 id: Some(GlobalId::User(1)),
650 update: HealthStatusUpdate::running(),
651 }),
652 AssertStatus(vec![StatusToAssert {
653 collection_index: 1,
654 status: Status::Running,
655 ..Default::default()
656 }]),
657 ],
658 );
659 }
660
661 #[mz_ore::test]
662 #[cfg_attr(miri, ignore)] fn test_health_operator_write_namespaced_map() {
664 use Step::*;
665
666 health_operator_runner(
668 2,
669 2,
670 false,
672 vec![
673 AssertStatus(vec![
674 StatusToAssert {
676 collection_index: 0,
677 status: Status::Starting,
678 ..Default::default()
679 },
680 StatusToAssert {
681 collection_index: 1,
682 status: Status::Starting,
683 ..Default::default()
684 },
685 ]),
686 Update(TestUpdate {
687 worker_id: 0,
688 namespace: StatusNamespace::Generator,
689 id: Some(GlobalId::User(1)),
690 update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
691 }),
692 AssertStatus(vec![StatusToAssert {
693 collection_index: 1,
694 status: Status::Stalled,
695 error: Some("generator: uhoh".to_string()),
696 errors: None,
697 ..Default::default()
698 }]),
699 ],
700 )
701 }
702
703 #[mz_ore::test]
704 #[cfg_attr(miri, ignore)] fn test_health_operator_namespaces() {
706 use Step::*;
707
708 health_operator_runner(
710 2,
711 1,
712 true,
713 vec![
714 AssertStatus(vec![
715 StatusToAssert {
717 collection_index: 0,
718 status: Status::Starting,
719 ..Default::default()
720 },
721 ]),
722 Update(TestUpdate {
726 worker_id: 0,
727 namespace: StatusNamespace::Generator,
728 id: None,
729 update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
730 }),
731 AssertStatus(vec![StatusToAssert {
732 collection_index: 0,
733 status: Status::Stalled,
734 error: Some("generator: uhoh".to_string()),
735 errors: Some("generator: uhoh".to_string()),
736 ..Default::default()
737 }]),
738 Update(TestUpdate {
739 worker_id: 0,
740 namespace: StatusNamespace::Kafka,
741 id: None,
742 update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
743 }),
744 AssertStatus(vec![StatusToAssert {
745 collection_index: 0,
746 status: Status::Stalled,
747 error: Some("kafka: uhoh".to_string()),
748 errors: Some("generator: uhoh, kafka: uhoh".to_string()),
749 ..Default::default()
750 }]),
751 Update(TestUpdate {
753 worker_id: 0,
754 namespace: StatusNamespace::Kafka,
755 id: None,
756 update: HealthStatusUpdate::running(),
757 }),
758 AssertStatus(vec![StatusToAssert {
759 collection_index: 0,
760 status: Status::Stalled,
761 error: Some("generator: uhoh".to_string()),
762 errors: Some("generator: uhoh".to_string()),
763 ..Default::default()
764 }]),
765 Update(TestUpdate {
766 worker_id: 0,
767 namespace: StatusNamespace::Generator,
768 id: None,
769 update: HealthStatusUpdate::running(),
770 }),
771 AssertStatus(vec![StatusToAssert {
772 collection_index: 0,
773 status: Status::Running,
774 ..Default::default()
775 }]),
776 ],
777 );
778 }
779
780 #[mz_ore::test]
781 #[cfg_attr(miri, ignore)] fn test_health_operator_namespace_side_channel() {
783 use Step::*;
784
785 health_operator_runner(
786 2,
787 1,
788 true,
789 vec![
790 AssertStatus(vec![
791 StatusToAssert {
793 collection_index: 0,
794 status: Status::Starting,
795 ..Default::default()
796 },
797 ]),
798 Update(TestUpdate {
802 worker_id: 0,
803 namespace: StatusNamespace::Ssh,
804 id: None,
805 update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
806 }),
807 AssertStatus(vec![StatusToAssert {
808 collection_index: 0,
809 status: Status::Stalled,
810 error: Some("ssh: uhoh".to_string()),
811 errors: Some("ssh: uhoh".to_string()),
812 ..Default::default()
813 }]),
814 Update(TestUpdate {
815 worker_id: 0,
816 namespace: StatusNamespace::Ssh,
817 id: None,
818 update: HealthStatusUpdate::stalled("uhoh2".to_string(), None),
819 }),
820 AssertStatus(vec![StatusToAssert {
821 collection_index: 0,
822 status: Status::Stalled,
823 error: Some("ssh: uhoh2".to_string()),
824 errors: Some("ssh: uhoh2".to_string()),
825 ..Default::default()
826 }]),
827 Update(TestUpdate {
828 worker_id: 0,
829 namespace: StatusNamespace::Ssh,
830 id: None,
831 update: HealthStatusUpdate::running(),
832 }),
833 AssertStatus(vec![StatusToAssert {
835 collection_index: 0,
836 status: Status::Starting,
837 ..Default::default()
838 }]),
839 Update(TestUpdate {
840 worker_id: 0,
841 namespace: StatusNamespace::Generator,
842 id: None,
843 update: HealthStatusUpdate::running(),
844 }),
845 AssertStatus(vec![StatusToAssert {
846 collection_index: 0,
847 status: Status::Running,
848 ..Default::default()
849 }]),
850 ],
851 );
852 }
853
854 #[mz_ore::test]
855 #[cfg_attr(miri, ignore)] fn test_health_operator_hints() {
857 use Step::*;
858
859 health_operator_runner(
860 2,
861 1,
862 true,
863 vec![
864 AssertStatus(vec![
865 StatusToAssert {
867 collection_index: 0,
868 status: Status::Starting,
869 ..Default::default()
870 },
871 ]),
872 Update(TestUpdate {
874 worker_id: 0,
875 namespace: StatusNamespace::Generator,
876 id: None,
877 update: HealthStatusUpdate::stalled(
878 "uhoh".to_string(),
879 Some("hint1".to_string()),
880 ),
881 }),
882 AssertStatus(vec![StatusToAssert {
883 collection_index: 0,
884 status: Status::Stalled,
885 error: Some("generator: uhoh".to_string()),
886 errors: Some("generator: uhoh".to_string()),
887 hint: Some("hint1".to_string()),
888 }]),
889 Update(TestUpdate {
890 worker_id: 1,
891 namespace: StatusNamespace::Generator,
892 id: None,
893 update: HealthStatusUpdate::stalled(
894 "uhoh2".to_string(),
895 Some("hint2".to_string()),
896 ),
897 }),
898 AssertStatus(vec![StatusToAssert {
899 collection_index: 0,
900 status: Status::Stalled,
901 error: Some("generator: uhoh2".to_string()),
903 errors: Some("generator: uhoh2".to_string()),
904 hint: Some("hint1, hint2".to_string()),
905 }]),
906 Update(TestUpdate {
908 worker_id: 1,
909 namespace: StatusNamespace::Generator,
910 id: None,
911 update: HealthStatusUpdate::stalled(
912 "uhoh2".to_string(),
913 Some("hint3".to_string()),
914 ),
915 }),
916 AssertStatus(vec![StatusToAssert {
917 collection_index: 0,
918 status: Status::Stalled,
919 error: Some("generator: uhoh2".to_string()),
921 errors: Some("generator: uhoh2".to_string()),
922 hint: Some("hint1, hint3".to_string()),
923 }]),
924 Update(TestUpdate {
926 worker_id: 0,
927 namespace: StatusNamespace::Generator,
928 id: None,
929 update: HealthStatusUpdate::running(),
930 }),
931 AssertStatus(vec![StatusToAssert {
932 collection_index: 0,
933 status: Status::Stalled,
934 error: Some("generator: uhoh2".to_string()),
936 errors: Some("generator: uhoh2".to_string()),
937 hint: Some("hint3".to_string()),
938 }]),
939 Update(TestUpdate {
940 worker_id: 1,
941 namespace: StatusNamespace::Generator,
942 id: None,
943 update: HealthStatusUpdate::running(),
944 }),
945 AssertStatus(vec![StatusToAssert {
946 collection_index: 0,
947 status: Status::Running,
948 ..Default::default()
949 }]),
950 ],
951 );
952 }
953
954 use mz_ore::assert_err;
957 use timely::container::CapacityContainerBuilder;
958 use timely::dataflow::Scope;
959 use timely::dataflow::operators::Enter;
960 use timely::dataflow::operators::exchange::Exchange;
961 use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
962
963 #[derive(Debug, Clone, PartialEq, Eq)]
965 struct StatusToAssert {
966 collection_index: usize,
967 status: Status,
968 error: Option<String>,
969 errors: Option<String>,
970 hint: Option<String>,
971 }
972
973 impl Default for StatusToAssert {
974 fn default() -> Self {
975 StatusToAssert {
976 collection_index: Default::default(),
977 status: Status::Running,
978 error: Default::default(),
979 errors: Default::default(),
980 hint: Default::default(),
981 }
982 }
983 }
984
985 #[derive(Debug, Clone)]
988 struct TestUpdate {
989 worker_id: u64,
990 namespace: StatusNamespace,
991 id: Option<GlobalId>,
992 update: HealthStatusUpdate,
993 }
994
995 #[derive(Debug, Clone)]
996 enum Step {
997 Update(TestUpdate),
999 AssertStatus(Vec<StatusToAssert>),
1002 }
1003
1004 struct TestWriter {
1005 sender: UnboundedSender<StatusToAssert>,
1006 input_mapping: BTreeMap<GlobalId, usize>,
1007 }
1008
1009 impl HealthOperator for TestWriter {
1010 fn record_new_status(
1011 &self,
1012 collection_id: GlobalId,
1013 _ts: DateTime<Utc>,
1014 status: Status,
1015 new_error: Option<&str>,
1016 hints: &BTreeSet<String>,
1017 namespaced_errors: &BTreeMap<StatusNamespace, String>,
1018 write_namespaced_map: bool,
1019 ) {
1020 let _ = self.sender.send(StatusToAssert {
1021 collection_index: *self.input_mapping.get(&collection_id).unwrap(),
1022 status,
1023 error: new_error.map(str::to_string),
1024 errors: if !namespaced_errors.is_empty() && write_namespaced_map {
1025 Some(
1026 namespaced_errors
1027 .iter()
1028 .map(|(ns, err)| format!("{}: {}", ns, err))
1029 .join(", "),
1030 )
1031 } else {
1032 None
1033 },
1034 hint: if !hints.is_empty() {
1035 Some(hints.iter().join(", "))
1036 } else {
1037 None
1038 },
1039 });
1040 }
1041
1042 fn send_halt(&self, _id: GlobalId, _error: Option<(StatusNamespace, HealthStatusUpdate)>) {
1043 unimplemented!()
1045 }
1046
1047 fn chosen_worker(&self) -> Option<usize> {
1048 Some(0)
1050 }
1051 }
1052
1053 fn health_operator_runner(
1056 workers: usize,
1057 inputs: usize,
1058 write_namespaced_map: bool,
1059 steps: Vec<Step>,
1060 ) {
1061 let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
1062 let tokio_handle = tokio_runtime.handle().clone();
1063
1064 let inputs: BTreeMap<GlobalId, usize> = (0..inputs)
1065 .map(|index| (GlobalId::User(u64::cast_from(index)), index))
1066 .collect();
1067
1068 timely::execute::execute(
1069 timely::execute::Config {
1070 communication: timely::CommunicationConfig::Process(workers),
1071 worker: Default::default(),
1072 },
1073 move |worker| {
1074 let steps = steps.clone();
1075 let inputs = inputs.clone();
1076
1077 let _tokio_guard = tokio_handle.enter();
1078 let (in_tx, in_rx) = unbounded_channel();
1079 let (out_tx, mut out_rx) = unbounded_channel();
1080
1081 worker.dataflow::<(), _, _>(|root_scope| {
1082 root_scope
1083 .clone()
1084 .scoped::<mz_repr::Timestamp, _, _>("gus", |scope| {
1085 let input = producer(root_scope.clone(), in_rx).enter(scope);
1086 Box::leak(Box::new(health_operator(
1087 scope,
1088 mz_ore::now::SYSTEM_TIME.clone(),
1089 inputs.keys().copied().collect(),
1090 *inputs.first_key_value().unwrap().0,
1091 "source_test",
1092 &input,
1093 TestWriter {
1094 sender: out_tx,
1095 input_mapping: inputs,
1096 },
1097 write_namespaced_map,
1098 Duration::from_secs(5),
1099 )));
1100 });
1101 });
1102
1103 if worker.index() == 0 {
1105 use Step::*;
1106 for step in steps {
1107 match step {
1108 Update(update) => {
1109 let _ = in_tx.send(update);
1110 }
1111 AssertStatus(mut statuses) => loop {
1112 match out_rx.try_recv() {
1113 Err(_) => {
1114 worker.step();
1115 std::thread::sleep(std::time::Duration::from_millis(50));
1117 }
1118 Ok(update) => {
1119 let pos = statuses
1120 .iter()
1121 .position(|s| {
1122 s.collection_index == update.collection_index
1123 })
1124 .unwrap();
1125
1126 let status_to_assert = &statuses[pos];
1127 assert_eq!(&update, status_to_assert);
1128
1129 statuses.remove(pos);
1130 if statuses.is_empty() {
1131 break;
1132 }
1133 }
1134 }
1135 },
1136 }
1137 }
1138
1139 assert_err!(out_rx.try_recv());
1141 }
1142 },
1143 )
1144 .unwrap();
1145 }
1146
1147 fn producer<G: Scope<Timestamp = ()>>(
1154 scope: G,
1155 mut input: UnboundedReceiver<TestUpdate>,
1156 ) -> Stream<G, HealthStatusMessage> {
1157 let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope.clone());
1158 let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<_>>>();
1159
1160 let index = scope.index();
1161 iterator.build(|mut caps| async move {
1162 if index != 0 {
1164 return;
1165 }
1166 let mut capability = Some(caps.pop().unwrap());
1167 while let Some(element) = input.recv().await {
1168 output_handle.give(
1169 capability.as_ref().unwrap(),
1170 (
1171 element.worker_id,
1172 element.id,
1173 element.namespace,
1174 element.update,
1175 ),
1176 );
1177 }
1178
1179 capability.take();
1180 });
1181
1182 let output = output.exchange(|d| d.0).map(|d| HealthStatusMessage {
1183 id: d.1,
1184 namespace: d.2,
1185 update: d.3,
1186 });
1187
1188 output
1189 }
1190}