1use std::cell::RefCell;
13use std::collections::btree_map::Entry;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::fmt::Debug;
17use std::rc::Rc;
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use differential_dataflow::Hashable;
22use futures::StreamExt;
23use mz_ore::cast::CastFrom;
24use mz_ore::now::NowFn;
25use mz_repr::GlobalId;
26use mz_storage_client::client::{Status, StatusUpdate};
27use mz_timely_util::builder_async::{
28 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
29};
30use timely::dataflow::channels::pact::Exchange;
31use timely::dataflow::operators::Map;
32use timely::dataflow::{Scope, Stream};
33use tracing::{error, info};
34
35use crate::internal_control::{InternalCommandSender, InternalStorageCommand};
36
37#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
45pub enum StatusNamespace {
46 Generator,
49 Kafka,
50 Postgres,
51 MySql,
52 SqlServer,
53 Ssh,
54 Upsert,
55 Decode,
56 Internal,
57}
58
59impl StatusNamespace {
60 fn is_sidechannel(&self) -> bool {
61 matches!(self, StatusNamespace::Ssh)
62 }
63}
64
65impl fmt::Display for StatusNamespace {
66 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67 use StatusNamespace::*;
68 match self {
69 Generator => write!(f, "generator"),
70 Kafka => write!(f, "kafka"),
71 Postgres => write!(f, "postgres"),
72 MySql => write!(f, "mysql"),
73 SqlServer => write!(f, "sql-server"),
74 Ssh => write!(f, "ssh"),
75 Upsert => write!(f, "upsert"),
76 Decode => write!(f, "decode"),
77 Internal => write!(f, "internal"),
78 }
79 }
80}
81
82#[derive(Debug)]
83struct PerWorkerHealthStatus {
84 pub(crate) errors_by_worker: Vec<BTreeMap<StatusNamespace, HealthStatusUpdate>>,
85}
86
87impl PerWorkerHealthStatus {
88 fn merge_update(
89 &mut self,
90 worker: usize,
91 namespace: StatusNamespace,
92 update: HealthStatusUpdate,
93 only_greater: bool,
94 ) {
95 let errors = &mut self.errors_by_worker[worker];
96 match errors.entry(namespace) {
97 Entry::Vacant(v) => {
98 v.insert(update);
99 }
100 Entry::Occupied(mut o) => {
101 if !only_greater || o.get() < &update {
102 o.insert(update);
103 }
104 }
105 }
106 }
107
108 fn decide_status(&self) -> OverallStatus {
109 let mut output_status = OverallStatus::Starting;
110 let mut namespaced_errors: BTreeMap<StatusNamespace, String> = BTreeMap::new();
111 let mut hints: BTreeSet<String> = BTreeSet::new();
112
113 for status in self.errors_by_worker.iter() {
114 for (ns, ns_status) in status.iter() {
115 match ns_status {
116 HealthStatusUpdate::Ceased { error } => {
121 if Some(error) > namespaced_errors.get(ns).as_deref() {
122 namespaced_errors.insert(*ns, error.to_string());
123 }
124 }
125 HealthStatusUpdate::Stalled { error, hint, .. } => {
126 if Some(error) > namespaced_errors.get(ns).as_deref() {
127 namespaced_errors.insert(*ns, error.to_string());
128 }
129
130 if let Some(hint) = hint {
131 hints.insert(hint.to_string());
132 }
133 }
134 HealthStatusUpdate::Running => {
135 if !ns.is_sidechannel() {
136 output_status = OverallStatus::Running;
137 }
138 }
139 }
140 }
141 }
142
143 if !namespaced_errors.is_empty() {
144 let (ns, err) = namespaced_errors.last_key_value().unwrap();
146 output_status = OverallStatus::Stalled {
147 error: format!("{}: {}", ns, err),
148 hints,
149 namespaced_errors,
150 }
151 }
152
153 output_status
154 }
155}
156
157#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
158pub enum OverallStatus {
159 Starting,
160 Running,
161 Stalled {
162 error: String,
163 hints: BTreeSet<String>,
164 namespaced_errors: BTreeMap<StatusNamespace, String>,
165 },
166 Ceased {
167 error: String,
168 },
169}
170
171impl OverallStatus {
172 pub(crate) fn error(&self) -> Option<&str> {
174 match self {
175 OverallStatus::Starting | OverallStatus::Running => None,
176 OverallStatus::Stalled { error, .. } | OverallStatus::Ceased { error, .. } => {
177 Some(error)
178 }
179 }
180 }
181
182 pub(crate) fn errors(&self) -> Option<&BTreeMap<StatusNamespace, String>> {
184 match self {
185 OverallStatus::Starting | OverallStatus::Running | OverallStatus::Ceased { .. } => None,
186 OverallStatus::Stalled {
187 namespaced_errors, ..
188 } => Some(namespaced_errors),
189 }
190 }
191
192 pub(crate) fn hints(&self) -> BTreeSet<String> {
194 match self {
195 OverallStatus::Starting | OverallStatus::Running | OverallStatus::Ceased { .. } => {
196 BTreeSet::new()
197 }
198 OverallStatus::Stalled { hints, .. } => hints.clone(),
199 }
200 }
201}
202
203impl<'a> From<&'a OverallStatus> for Status {
204 fn from(val: &'a OverallStatus) -> Self {
205 match val {
206 OverallStatus::Starting => Status::Starting,
207 OverallStatus::Running => Status::Running,
208 OverallStatus::Stalled { .. } => Status::Stalled,
209 OverallStatus::Ceased { .. } => Status::Ceased,
210 }
211 }
212}
213
214#[derive(Debug)]
215struct HealthState {
216 healths: PerWorkerHealthStatus,
217 last_reported_status: Option<OverallStatus>,
218 halt_with: Option<(StatusNamespace, HealthStatusUpdate)>,
219}
220
221impl HealthState {
222 fn new(worker_count: usize) -> HealthState {
223 HealthState {
224 healths: PerWorkerHealthStatus {
225 errors_by_worker: vec![Default::default(); worker_count],
226 },
227 last_reported_status: None,
228 halt_with: None,
229 }
230 }
231}
232
233pub trait HealthOperator {
237 fn record_new_status(
239 &self,
240 collection_id: GlobalId,
241 ts: DateTime<Utc>,
242 new_status: Status,
243 new_error: Option<&str>,
244 hints: &BTreeSet<String>,
245 namespaced_errors: &BTreeMap<StatusNamespace, String>,
246 write_namespaced_map: bool,
251 );
252 fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>);
253
254 fn chosen_worker(&self) -> Option<usize> {
257 None
258 }
259}
260
261pub struct DefaultWriter {
263 pub command_tx: InternalCommandSender,
264 pub updates: Rc<RefCell<Vec<StatusUpdate>>>,
265}
266
267impl HealthOperator for DefaultWriter {
268 fn record_new_status(
269 &self,
270 collection_id: GlobalId,
271 ts: DateTime<Utc>,
272 status: Status,
273 new_error: Option<&str>,
274 hints: &BTreeSet<String>,
275 namespaced_errors: &BTreeMap<StatusNamespace, String>,
276 write_namespaced_map: bool,
277 ) {
278 self.updates.borrow_mut().push(StatusUpdate {
279 id: collection_id,
280 timestamp: ts,
281 status,
282 error: new_error.map(|e| e.to_string()),
283 hints: hints.clone(),
284 namespaced_errors: if write_namespaced_map {
285 namespaced_errors
286 .iter()
287 .map(|(ns, val)| (ns.to_string(), val.clone()))
288 .collect()
289 } else {
290 BTreeMap::new()
291 },
292 replica_id: None,
293 });
294 }
295
296 fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>) {
297 self.command_tx
298 .send(InternalStorageCommand::SuspendAndRestart {
299 id,
302 reason: format!("{:?}", error),
303 });
304 }
305}
306
307#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
309pub struct HealthStatusMessage {
310 pub id: Option<GlobalId>,
313 pub namespace: StatusNamespace,
315 pub update: HealthStatusUpdate,
317}
318
319pub(crate) fn health_operator<G, P>(
327 scope: &G,
328 now: NowFn,
329 mark_starting: BTreeSet<GlobalId>,
331 halting_id: GlobalId,
334 object_type: &'static str,
336 health_stream: &Stream<G, HealthStatusMessage>,
338 health_operator_impl: P,
340 write_namespaced_map: bool,
342 suspend_and_restart_delay: Duration,
345) -> PressOnDropButton
346where
347 G: Scope,
348 P: HealthOperator + 'static,
349{
350 let healthcheck_worker_id = scope.index();
352 let worker_count = scope.peers();
353
354 let health_stream = health_stream.map(move |status| (healthcheck_worker_id, status));
356
357 let chosen_worker_id = if let Some(index) = health_operator_impl.chosen_worker() {
358 index
359 } else {
360 usize::cast_from(mark_starting.iter().next().hashed()) % worker_count
363 };
364
365 let is_active_worker = chosen_worker_id == healthcheck_worker_id;
366
367 let operator_name = format!("healthcheck({})", healthcheck_worker_id);
368 let mut health_op = AsyncOperatorBuilder::new(operator_name, scope.clone());
369
370 let mut input = health_op.new_disconnected_input(
371 &health_stream,
372 Exchange::new(move |_| u64::cast_from(chosen_worker_id)),
373 );
374
375 let button = health_op.build(move |mut _capabilities| async move {
376 let mut health_states: BTreeMap<_, _> = mark_starting
377 .iter()
378 .copied()
379 .chain([halting_id])
380 .map(|id| (id, HealthState::new(worker_count)))
381 .collect();
382
383 if is_active_worker {
385 for (id, state) in health_states.iter_mut() {
386 if mark_starting.contains(id) {
387 let status = OverallStatus::Starting;
388 let timestamp = mz_ore::now::to_datetime(now());
389 health_operator_impl.record_new_status(
390 *id,
391 timestamp,
392 (&status).into(),
393 status.error(),
394 &status.hints(),
395 status.errors().unwrap_or(&BTreeMap::new()),
396 write_namespaced_map,
397 );
398
399 state.last_reported_status = Some(status);
400 }
401 }
402 }
403
404 let mut outputs_seen = BTreeMap::<GlobalId, BTreeSet<_>>::new();
405 while let Some(event) = input.next().await {
406 if let AsyncEvent::Data(_cap, rows) = event {
407 for (worker_id, message) in rows {
408 let HealthStatusMessage {
409 id,
410 namespace: ns,
411 update: health_event,
412 } = message;
413 let id = id.unwrap_or(halting_id);
414 let HealthState {
415 healths, halt_with, ..
416 } = match health_states.get_mut(&id) {
417 Some(health) => health,
418 None => continue,
422 };
423
424 let new_round = outputs_seen
427 .entry(id)
428 .or_insert_with(BTreeSet::new)
429 .insert(ns.clone());
430
431 if !is_active_worker {
432 error!(
433 "Health messages for {object_type} {id} passed to \
434 an unexpected worker id: {healthcheck_worker_id}"
435 )
436 }
437
438 if health_event.should_halt() {
439 *halt_with = Some((ns.clone(), health_event.clone()));
440 }
441
442 healths.merge_update(worker_id, ns, health_event, !new_round);
443 }
444
445 let mut halt_with_outer = None;
446
447 while let Some((id, _)) = outputs_seen.pop_first() {
448 let HealthState {
449 healths,
450 last_reported_status,
451 halt_with,
452 } = health_states.get_mut(&id).expect("known to exist");
453
454 let new_status = healths.decide_status();
455
456 if Some(&new_status) != last_reported_status.as_ref() {
457 info!(
458 "Health transition for {object_type} {id}: \
459 {last_reported_status:?} -> {:?}",
460 Some(&new_status)
461 );
462
463 let timestamp = mz_ore::now::to_datetime(now());
464 health_operator_impl.record_new_status(
465 id,
466 timestamp,
467 (&new_status).into(),
468 new_status.error(),
469 &new_status.hints(),
470 new_status.errors().unwrap_or(&BTreeMap::new()),
471 write_namespaced_map,
472 );
473
474 *last_reported_status = Some(new_status.clone());
475 }
476
477 if halt_with_outer.is_none() && halt_with.is_some() {
479 halt_with_outer = Some((id, halt_with.clone()));
480 }
481 }
482
483 if let Some((id, halt_with)) = halt_with_outer {
488 mz_ore::soft_assert_or_log!(
489 id == halting_id,
490 "sub{object_type}s should not produce \
491 halting errors, however {:?} halted while primary \
492 {object_type} is {:?}",
493 id,
494 halting_id
495 );
496
497 info!(
498 "Broadcasting suspend-and-restart \
499 command because of {:?} after {:?} delay",
500 halt_with, suspend_and_restart_delay
501 );
502 tokio::time::sleep(suspend_and_restart_delay).await;
503 health_operator_impl.send_halt(id, halt_with);
504 }
505 }
506 }
507 });
508
509 button.press_on_drop()
510}
511
512use serde::{Deserialize, Serialize};
513
514#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
518pub enum HealthStatusUpdate {
519 Running,
520 Stalled {
521 error: String,
522 hint: Option<String>,
523 should_halt: bool,
524 },
525 Ceased {
526 error: String,
527 },
528}
529
530impl HealthStatusUpdate {
531 pub(crate) fn running() -> Self {
533 HealthStatusUpdate::Running
534 }
535
536 pub(crate) fn stalled(error: String, hint: Option<String>) -> Self {
538 HealthStatusUpdate::Stalled {
539 error,
540 hint,
541 should_halt: false,
542 }
543 }
544
545 pub(crate) fn halting(error: String, hint: Option<String>) -> Self {
547 HealthStatusUpdate::Stalled {
548 error,
549 hint,
550 should_halt: true,
551 }
552 }
553
554 pub(crate) fn should_halt(&self) -> bool {
562 match self {
563 HealthStatusUpdate::Running |
564 HealthStatusUpdate::Ceased { .. } => false,
569 HealthStatusUpdate::Stalled { should_halt, .. } => *should_halt,
570 }
571 }
572}
573
574#[cfg(test)]
575mod tests {
576 use super::*;
577 use itertools::Itertools;
578
579 #[mz_ore::test]
582 #[cfg_attr(miri, ignore)] fn test_health_operator_basic() {
584 use Step::*;
585
586 health_operator_runner(
588 2,
589 2,
590 true,
591 vec![
592 AssertStatus(vec![
593 StatusToAssert {
595 collection_index: 0,
596 status: Status::Starting,
597 ..Default::default()
598 },
599 StatusToAssert {
600 collection_index: 1,
601 status: Status::Starting,
602 ..Default::default()
603 },
604 ]),
605 Update(TestUpdate {
607 worker_id: 1,
608 namespace: StatusNamespace::Generator,
609 id: None,
610 update: HealthStatusUpdate::running(),
611 }),
612 AssertStatus(vec![StatusToAssert {
613 collection_index: 0,
614 status: Status::Running,
615 ..Default::default()
616 }]),
617 Update(TestUpdate {
624 worker_id: 1,
625 namespace: StatusNamespace::Generator,
626 id: Some(GlobalId::User(1)),
627 update: HealthStatusUpdate::running(),
628 }),
629 AssertStatus(vec![StatusToAssert {
630 collection_index: 1,
631 status: Status::Running,
632 ..Default::default()
633 }]),
634 Update(TestUpdate {
635 worker_id: 0,
636 namespace: StatusNamespace::Generator,
637 id: Some(GlobalId::User(1)),
638 update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
639 }),
640 AssertStatus(vec![StatusToAssert {
641 collection_index: 1,
642 status: Status::Stalled,
643 error: Some("generator: uhoh".to_string()),
644 errors: Some("generator: uhoh".to_string()),
645 ..Default::default()
646 }]),
647 Update(TestUpdate {
649 worker_id: 0,
650 namespace: StatusNamespace::Generator,
651 id: Some(GlobalId::User(1)),
652 update: HealthStatusUpdate::running(),
653 }),
654 AssertStatus(vec![StatusToAssert {
655 collection_index: 1,
656 status: Status::Running,
657 ..Default::default()
658 }]),
659 ],
660 );
661 }
662
663 #[mz_ore::test]
664 #[cfg_attr(miri, ignore)] fn test_health_operator_write_namespaced_map() {
666 use Step::*;
667
668 health_operator_runner(
670 2,
671 2,
672 false,
674 vec![
675 AssertStatus(vec![
676 StatusToAssert {
678 collection_index: 0,
679 status: Status::Starting,
680 ..Default::default()
681 },
682 StatusToAssert {
683 collection_index: 1,
684 status: Status::Starting,
685 ..Default::default()
686 },
687 ]),
688 Update(TestUpdate {
689 worker_id: 0,
690 namespace: StatusNamespace::Generator,
691 id: Some(GlobalId::User(1)),
692 update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
693 }),
694 AssertStatus(vec![StatusToAssert {
695 collection_index: 1,
696 status: Status::Stalled,
697 error: Some("generator: uhoh".to_string()),
698 errors: None,
699 ..Default::default()
700 }]),
701 ],
702 )
703 }
704
705 #[mz_ore::test]
706 #[cfg_attr(miri, ignore)] fn test_health_operator_namespaces() {
708 use Step::*;
709
710 health_operator_runner(
712 2,
713 1,
714 true,
715 vec![
716 AssertStatus(vec![
717 StatusToAssert {
719 collection_index: 0,
720 status: Status::Starting,
721 ..Default::default()
722 },
723 ]),
724 Update(TestUpdate {
728 worker_id: 0,
729 namespace: StatusNamespace::Generator,
730 id: None,
731 update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
732 }),
733 AssertStatus(vec![StatusToAssert {
734 collection_index: 0,
735 status: Status::Stalled,
736 error: Some("generator: uhoh".to_string()),
737 errors: Some("generator: uhoh".to_string()),
738 ..Default::default()
739 }]),
740 Update(TestUpdate {
741 worker_id: 0,
742 namespace: StatusNamespace::Kafka,
743 id: None,
744 update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
745 }),
746 AssertStatus(vec![StatusToAssert {
747 collection_index: 0,
748 status: Status::Stalled,
749 error: Some("kafka: uhoh".to_string()),
750 errors: Some("generator: uhoh, kafka: uhoh".to_string()),
751 ..Default::default()
752 }]),
753 Update(TestUpdate {
755 worker_id: 0,
756 namespace: StatusNamespace::Kafka,
757 id: None,
758 update: HealthStatusUpdate::running(),
759 }),
760 AssertStatus(vec![StatusToAssert {
761 collection_index: 0,
762 status: Status::Stalled,
763 error: Some("generator: uhoh".to_string()),
764 errors: Some("generator: uhoh".to_string()),
765 ..Default::default()
766 }]),
767 Update(TestUpdate {
768 worker_id: 0,
769 namespace: StatusNamespace::Generator,
770 id: None,
771 update: HealthStatusUpdate::running(),
772 }),
773 AssertStatus(vec![StatusToAssert {
774 collection_index: 0,
775 status: Status::Running,
776 ..Default::default()
777 }]),
778 ],
779 );
780 }
781
782 #[mz_ore::test]
783 #[cfg_attr(miri, ignore)] fn test_health_operator_namespace_side_channel() {
785 use Step::*;
786
787 health_operator_runner(
788 2,
789 1,
790 true,
791 vec![
792 AssertStatus(vec![
793 StatusToAssert {
795 collection_index: 0,
796 status: Status::Starting,
797 ..Default::default()
798 },
799 ]),
800 Update(TestUpdate {
804 worker_id: 0,
805 namespace: StatusNamespace::Ssh,
806 id: None,
807 update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
808 }),
809 AssertStatus(vec![StatusToAssert {
810 collection_index: 0,
811 status: Status::Stalled,
812 error: Some("ssh: uhoh".to_string()),
813 errors: Some("ssh: uhoh".to_string()),
814 ..Default::default()
815 }]),
816 Update(TestUpdate {
817 worker_id: 0,
818 namespace: StatusNamespace::Ssh,
819 id: None,
820 update: HealthStatusUpdate::stalled("uhoh2".to_string(), None),
821 }),
822 AssertStatus(vec![StatusToAssert {
823 collection_index: 0,
824 status: Status::Stalled,
825 error: Some("ssh: uhoh2".to_string()),
826 errors: Some("ssh: uhoh2".to_string()),
827 ..Default::default()
828 }]),
829 Update(TestUpdate {
830 worker_id: 0,
831 namespace: StatusNamespace::Ssh,
832 id: None,
833 update: HealthStatusUpdate::running(),
834 }),
835 AssertStatus(vec![StatusToAssert {
837 collection_index: 0,
838 status: Status::Starting,
839 ..Default::default()
840 }]),
841 Update(TestUpdate {
842 worker_id: 0,
843 namespace: StatusNamespace::Generator,
844 id: None,
845 update: HealthStatusUpdate::running(),
846 }),
847 AssertStatus(vec![StatusToAssert {
848 collection_index: 0,
849 status: Status::Running,
850 ..Default::default()
851 }]),
852 ],
853 );
854 }
855
856 #[mz_ore::test]
857 #[cfg_attr(miri, ignore)] fn test_health_operator_hints() {
859 use Step::*;
860
861 health_operator_runner(
862 2,
863 1,
864 true,
865 vec![
866 AssertStatus(vec![
867 StatusToAssert {
869 collection_index: 0,
870 status: Status::Starting,
871 ..Default::default()
872 },
873 ]),
874 Update(TestUpdate {
876 worker_id: 0,
877 namespace: StatusNamespace::Generator,
878 id: None,
879 update: HealthStatusUpdate::stalled(
880 "uhoh".to_string(),
881 Some("hint1".to_string()),
882 ),
883 }),
884 AssertStatus(vec![StatusToAssert {
885 collection_index: 0,
886 status: Status::Stalled,
887 error: Some("generator: uhoh".to_string()),
888 errors: Some("generator: uhoh".to_string()),
889 hint: Some("hint1".to_string()),
890 }]),
891 Update(TestUpdate {
892 worker_id: 1,
893 namespace: StatusNamespace::Generator,
894 id: None,
895 update: HealthStatusUpdate::stalled(
896 "uhoh2".to_string(),
897 Some("hint2".to_string()),
898 ),
899 }),
900 AssertStatus(vec![StatusToAssert {
901 collection_index: 0,
902 status: Status::Stalled,
903 error: Some("generator: uhoh2".to_string()),
905 errors: Some("generator: uhoh2".to_string()),
906 hint: Some("hint1, hint2".to_string()),
907 }]),
908 Update(TestUpdate {
910 worker_id: 1,
911 namespace: StatusNamespace::Generator,
912 id: None,
913 update: HealthStatusUpdate::stalled(
914 "uhoh2".to_string(),
915 Some("hint3".to_string()),
916 ),
917 }),
918 AssertStatus(vec![StatusToAssert {
919 collection_index: 0,
920 status: Status::Stalled,
921 error: Some("generator: uhoh2".to_string()),
923 errors: Some("generator: uhoh2".to_string()),
924 hint: Some("hint1, hint3".to_string()),
925 }]),
926 Update(TestUpdate {
928 worker_id: 0,
929 namespace: StatusNamespace::Generator,
930 id: None,
931 update: HealthStatusUpdate::running(),
932 }),
933 AssertStatus(vec![StatusToAssert {
934 collection_index: 0,
935 status: Status::Stalled,
936 error: Some("generator: uhoh2".to_string()),
938 errors: Some("generator: uhoh2".to_string()),
939 hint: Some("hint3".to_string()),
940 }]),
941 Update(TestUpdate {
942 worker_id: 1,
943 namespace: StatusNamespace::Generator,
944 id: None,
945 update: HealthStatusUpdate::running(),
946 }),
947 AssertStatus(vec![StatusToAssert {
948 collection_index: 0,
949 status: Status::Running,
950 ..Default::default()
951 }]),
952 ],
953 );
954 }
955
956 use mz_ore::assert_err;
959 use timely::container::CapacityContainerBuilder;
960 use timely::dataflow::Scope;
961 use timely::dataflow::operators::Enter;
962 use timely::dataflow::operators::exchange::Exchange;
963 use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
964
965 #[derive(Debug, Clone, PartialEq, Eq)]
967 struct StatusToAssert {
968 collection_index: usize,
969 status: Status,
970 error: Option<String>,
971 errors: Option<String>,
972 hint: Option<String>,
973 }
974
975 impl Default for StatusToAssert {
976 fn default() -> Self {
977 StatusToAssert {
978 collection_index: Default::default(),
979 status: Status::Running,
980 error: Default::default(),
981 errors: Default::default(),
982 hint: Default::default(),
983 }
984 }
985 }
986
987 #[derive(Debug, Clone)]
990 struct TestUpdate {
991 worker_id: u64,
992 namespace: StatusNamespace,
993 id: Option<GlobalId>,
994 update: HealthStatusUpdate,
995 }
996
997 #[derive(Debug, Clone)]
998 enum Step {
999 Update(TestUpdate),
1001 AssertStatus(Vec<StatusToAssert>),
1004 }
1005
1006 struct TestWriter {
1007 sender: UnboundedSender<StatusToAssert>,
1008 input_mapping: BTreeMap<GlobalId, usize>,
1009 }
1010
1011 impl HealthOperator for TestWriter {
1012 fn record_new_status(
1013 &self,
1014 collection_id: GlobalId,
1015 _ts: DateTime<Utc>,
1016 status: Status,
1017 new_error: Option<&str>,
1018 hints: &BTreeSet<String>,
1019 namespaced_errors: &BTreeMap<StatusNamespace, String>,
1020 write_namespaced_map: bool,
1021 ) {
1022 let _ = self.sender.send(StatusToAssert {
1023 collection_index: *self.input_mapping.get(&collection_id).unwrap(),
1024 status,
1025 error: new_error.map(str::to_string),
1026 errors: if !namespaced_errors.is_empty() && write_namespaced_map {
1027 Some(
1028 namespaced_errors
1029 .iter()
1030 .map(|(ns, err)| format!("{}: {}", ns, err))
1031 .join(", "),
1032 )
1033 } else {
1034 None
1035 },
1036 hint: if !hints.is_empty() {
1037 Some(hints.iter().join(", "))
1038 } else {
1039 None
1040 },
1041 });
1042 }
1043
1044 fn send_halt(&self, _id: GlobalId, _error: Option<(StatusNamespace, HealthStatusUpdate)>) {
1045 unimplemented!()
1047 }
1048
1049 fn chosen_worker(&self) -> Option<usize> {
1050 Some(0)
1052 }
1053 }
1054
1055 fn health_operator_runner(
1058 workers: usize,
1059 inputs: usize,
1060 write_namespaced_map: bool,
1061 steps: Vec<Step>,
1062 ) {
1063 let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
1064 let tokio_handle = tokio_runtime.handle().clone();
1065
1066 let inputs: BTreeMap<GlobalId, usize> = (0..inputs)
1067 .map(|index| (GlobalId::User(u64::cast_from(index)), index))
1068 .collect();
1069
1070 timely::execute::execute(
1071 timely::execute::Config {
1072 communication: timely::CommunicationConfig::Process(workers),
1073 worker: Default::default(),
1074 },
1075 move |worker| {
1076 let steps = steps.clone();
1077 let inputs = inputs.clone();
1078
1079 let _tokio_guard = tokio_handle.enter();
1080 let (in_tx, in_rx) = unbounded_channel();
1081 let (out_tx, mut out_rx) = unbounded_channel();
1082
1083 worker.dataflow::<(), _, _>(|root_scope| {
1084 root_scope
1085 .clone()
1086 .scoped::<mz_repr::Timestamp, _, _>("gus", |scope| {
1087 let input = producer(root_scope.clone(), in_rx).enter(scope);
1088 Box::leak(Box::new(health_operator(
1089 scope,
1090 mz_ore::now::SYSTEM_TIME.clone(),
1091 inputs.keys().copied().collect(),
1092 *inputs.first_key_value().unwrap().0,
1093 "source_test",
1094 &input,
1095 TestWriter {
1096 sender: out_tx,
1097 input_mapping: inputs,
1098 },
1099 write_namespaced_map,
1100 Duration::from_secs(5),
1101 )));
1102 });
1103 });
1104
1105 if worker.index() == 0 {
1107 use Step::*;
1108 for step in steps {
1109 match step {
1110 Update(update) => {
1111 let _ = in_tx.send(update);
1112 }
1113 AssertStatus(mut statuses) => loop {
1114 match out_rx.try_recv() {
1115 Err(_) => {
1116 worker.step();
1117 std::thread::sleep(std::time::Duration::from_millis(50));
1119 }
1120 Ok(update) => {
1121 let pos = statuses
1122 .iter()
1123 .position(|s| {
1124 s.collection_index == update.collection_index
1125 })
1126 .unwrap();
1127
1128 let status_to_assert = &statuses[pos];
1129 assert_eq!(&update, status_to_assert);
1130
1131 statuses.remove(pos);
1132 if statuses.is_empty() {
1133 break;
1134 }
1135 }
1136 }
1137 },
1138 }
1139 }
1140
1141 assert_err!(out_rx.try_recv());
1143 }
1144 },
1145 )
1146 .unwrap();
1147 }
1148
1149 fn producer<G: Scope<Timestamp = ()>>(
1156 scope: G,
1157 mut input: UnboundedReceiver<TestUpdate>,
1158 ) -> Stream<G, HealthStatusMessage> {
1159 let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope.clone());
1160 let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<_>>>();
1161
1162 let index = scope.index();
1163 iterator.build(|mut caps| async move {
1164 if index != 0 {
1166 return;
1167 }
1168 let mut capability = Some(caps.pop().unwrap());
1169 while let Some(element) = input.recv().await {
1170 output_handle.give(
1171 capability.as_ref().unwrap(),
1172 (
1173 element.worker_id,
1174 element.id,
1175 element.namespace,
1176 element.update,
1177 ),
1178 );
1179 }
1180
1181 capability.take();
1182 });
1183
1184 let output = output.exchange(|d| d.0).map(|d| HealthStatusMessage {
1185 id: d.1,
1186 namespace: d.2,
1187 update: d.3,
1188 });
1189
1190 output
1191 }
1192}