1use std::cell::RefCell;
13use std::collections::btree_map::Entry;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::fmt::Debug;
17use std::rc::Rc;
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use differential_dataflow::Hashable;
22use futures::StreamExt;
23use mz_ore::cast::CastFrom;
24use mz_ore::now::NowFn;
25use mz_repr::GlobalId;
26use mz_storage_client::client::{Status, StatusUpdate};
27use mz_timely_util::builder_async::{
28 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
29};
30use timely::dataflow::channels::pact::Exchange;
31use timely::dataflow::operators::Map;
32use timely::dataflow::{Scope, Stream};
33use tracing::{error, info};
34
35use crate::internal_control::{InternalCommandSender, InternalStorageCommand};
36
37#[derive(
45 Copy,
46 Clone,
47 Debug,
48 Serialize,
49 Deserialize,
50 PartialEq,
51 Eq,
52 PartialOrd,
53 Ord
54)]
55pub enum StatusNamespace {
56 Generator,
59 Kafka,
60 Postgres,
61 MySql,
62 SqlServer,
63 Ssh,
64 Upsert,
65 Decode,
66 Iceberg,
67 Internal,
68}
69
70impl StatusNamespace {
71 fn is_sidechannel(&self) -> bool {
72 matches!(self, StatusNamespace::Ssh)
73 }
74}
75
76impl fmt::Display for StatusNamespace {
77 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78 use StatusNamespace::*;
79 match self {
80 Generator => write!(f, "generator"),
81 Kafka => write!(f, "kafka"),
82 Postgres => write!(f, "postgres"),
83 MySql => write!(f, "mysql"),
84 SqlServer => write!(f, "sql-server"),
85 Ssh => write!(f, "ssh"),
86 Upsert => write!(f, "upsert"),
87 Decode => write!(f, "decode"),
88 Internal => write!(f, "internal"),
89 Iceberg => write!(f, "iceberg"),
90 }
91 }
92}
93
94#[derive(Debug)]
95struct PerWorkerHealthStatus {
96 pub(crate) errors_by_worker: Vec<BTreeMap<StatusNamespace, HealthStatusUpdate>>,
97}
98
99impl PerWorkerHealthStatus {
100 fn merge_update(
101 &mut self,
102 worker: usize,
103 namespace: StatusNamespace,
104 update: HealthStatusUpdate,
105 only_greater: bool,
106 ) {
107 let errors = &mut self.errors_by_worker[worker];
108 match errors.entry(namespace) {
109 Entry::Vacant(v) => {
110 v.insert(update);
111 }
112 Entry::Occupied(mut o) => {
113 if !only_greater || o.get() < &update {
114 o.insert(update);
115 }
116 }
117 }
118 }
119
120 fn decide_status(&self) -> OverallStatus {
121 let mut output_status = OverallStatus::Starting;
122 let mut namespaced_errors: BTreeMap<StatusNamespace, String> = BTreeMap::new();
123 let mut hints: BTreeSet<String> = BTreeSet::new();
124
125 for status in self.errors_by_worker.iter() {
126 for (ns, ns_status) in status.iter() {
127 match ns_status {
128 HealthStatusUpdate::Ceased { error } => {
133 if Some(error) > namespaced_errors.get(ns).as_deref() {
134 namespaced_errors.insert(*ns, error.to_string());
135 }
136 }
137 HealthStatusUpdate::Stalled { error, hint, .. } => {
138 if Some(error) > namespaced_errors.get(ns).as_deref() {
139 namespaced_errors.insert(*ns, error.to_string());
140 }
141
142 if let Some(hint) = hint {
143 hints.insert(hint.to_string());
144 }
145 }
146 HealthStatusUpdate::Running => {
147 if !ns.is_sidechannel() {
148 output_status = OverallStatus::Running;
149 }
150 }
151 }
152 }
153 }
154
155 if !namespaced_errors.is_empty() {
156 let (ns, err) = namespaced_errors.last_key_value().unwrap();
158 output_status = OverallStatus::Stalled {
159 error: format!("{}: {}", ns, err),
160 hints,
161 namespaced_errors,
162 }
163 }
164
165 output_status
166 }
167}
168
169#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
170pub enum OverallStatus {
171 Starting,
172 Running,
173 Stalled {
174 error: String,
175 hints: BTreeSet<String>,
176 namespaced_errors: BTreeMap<StatusNamespace, String>,
177 },
178 Ceased {
179 error: String,
180 },
181}
182
183impl OverallStatus {
184 pub(crate) fn error(&self) -> Option<&str> {
186 match self {
187 OverallStatus::Starting | OverallStatus::Running => None,
188 OverallStatus::Stalled { error, .. } | OverallStatus::Ceased { error, .. } => {
189 Some(error)
190 }
191 }
192 }
193
194 pub(crate) fn errors(&self) -> Option<&BTreeMap<StatusNamespace, String>> {
196 match self {
197 OverallStatus::Starting | OverallStatus::Running | OverallStatus::Ceased { .. } => None,
198 OverallStatus::Stalled {
199 namespaced_errors, ..
200 } => Some(namespaced_errors),
201 }
202 }
203
204 pub(crate) fn hints(&self) -> BTreeSet<String> {
206 match self {
207 OverallStatus::Starting | OverallStatus::Running | OverallStatus::Ceased { .. } => {
208 BTreeSet::new()
209 }
210 OverallStatus::Stalled { hints, .. } => hints.clone(),
211 }
212 }
213}
214
215impl<'a> From<&'a OverallStatus> for Status {
216 fn from(val: &'a OverallStatus) -> Self {
217 match val {
218 OverallStatus::Starting => Status::Starting,
219 OverallStatus::Running => Status::Running,
220 OverallStatus::Stalled { .. } => Status::Stalled,
221 OverallStatus::Ceased { .. } => Status::Ceased,
222 }
223 }
224}
225
226#[derive(Debug)]
227struct HealthState {
228 healths: PerWorkerHealthStatus,
229 last_reported_status: Option<OverallStatus>,
230 halt_with: Option<(StatusNamespace, HealthStatusUpdate)>,
231}
232
233impl HealthState {
234 fn new(worker_count: usize) -> HealthState {
235 HealthState {
236 healths: PerWorkerHealthStatus {
237 errors_by_worker: vec![Default::default(); worker_count],
238 },
239 last_reported_status: None,
240 halt_with: None,
241 }
242 }
243}
244
245pub trait HealthOperator {
249 fn record_new_status(
251 &self,
252 collection_id: GlobalId,
253 ts: DateTime<Utc>,
254 new_status: Status,
255 new_error: Option<&str>,
256 hints: &BTreeSet<String>,
257 namespaced_errors: &BTreeMap<StatusNamespace, String>,
258 write_namespaced_map: bool,
263 );
264 fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>);
265
266 fn chosen_worker(&self) -> Option<usize> {
269 None
270 }
271}
272
273pub struct DefaultWriter {
275 pub command_tx: InternalCommandSender,
276 pub updates: Rc<RefCell<Vec<StatusUpdate>>>,
277}
278
279impl HealthOperator for DefaultWriter {
280 fn record_new_status(
281 &self,
282 collection_id: GlobalId,
283 ts: DateTime<Utc>,
284 status: Status,
285 new_error: Option<&str>,
286 hints: &BTreeSet<String>,
287 namespaced_errors: &BTreeMap<StatusNamespace, String>,
288 write_namespaced_map: bool,
289 ) {
290 self.updates.borrow_mut().push(StatusUpdate {
291 id: collection_id,
292 timestamp: ts,
293 status,
294 error: new_error.map(|e| e.to_string()),
295 hints: hints.clone(),
296 namespaced_errors: if write_namespaced_map {
297 namespaced_errors
298 .iter()
299 .map(|(ns, val)| (ns.to_string(), val.clone()))
300 .collect()
301 } else {
302 BTreeMap::new()
303 },
304 replica_id: None,
305 });
306 }
307
308 fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>) {
309 self.command_tx
310 .send(InternalStorageCommand::SuspendAndRestart {
311 id,
314 reason: format!("{:?}", error),
315 });
316 }
317}
318
319#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
321pub struct HealthStatusMessage {
322 pub id: Option<GlobalId>,
325 pub namespace: StatusNamespace,
327 pub update: HealthStatusUpdate,
329}
330
331pub(crate) fn health_operator<G, P>(
339 scope: &G,
340 now: NowFn,
341 mark_starting: BTreeSet<GlobalId>,
343 halting_id: GlobalId,
346 object_type: &'static str,
348 health_stream: &Stream<G, HealthStatusMessage>,
350 health_operator_impl: P,
352 write_namespaced_map: bool,
354 suspend_and_restart_delay: Duration,
357) -> PressOnDropButton
358where
359 G: Scope,
360 P: HealthOperator + 'static,
361{
362 let healthcheck_worker_id = scope.index();
364 let worker_count = scope.peers();
365
366 let health_stream = health_stream.map(move |status| (healthcheck_worker_id, status));
368
369 let chosen_worker_id = if let Some(index) = health_operator_impl.chosen_worker() {
370 index
371 } else {
372 usize::cast_from(mark_starting.iter().next().hashed()) % worker_count
375 };
376
377 let is_active_worker = chosen_worker_id == healthcheck_worker_id;
378
379 let operator_name = format!("healthcheck({})", healthcheck_worker_id);
380 let mut health_op = AsyncOperatorBuilder::new(operator_name, scope.clone());
381
382 let mut input = health_op.new_disconnected_input(
383 &health_stream,
384 Exchange::new(move |_| u64::cast_from(chosen_worker_id)),
385 );
386
387 let button = health_op.build(move |mut _capabilities| async move {
388 let mut health_states: BTreeMap<_, _> = mark_starting
389 .iter()
390 .copied()
391 .chain([halting_id])
392 .map(|id| (id, HealthState::new(worker_count)))
393 .collect();
394
395 if is_active_worker {
397 for (id, state) in health_states.iter_mut() {
398 let status = OverallStatus::Starting;
399 let timestamp = mz_ore::now::to_datetime(now());
400 health_operator_impl.record_new_status(
401 *id,
402 timestamp,
403 (&status).into(),
404 status.error(),
405 &status.hints(),
406 status.errors().unwrap_or(&BTreeMap::new()),
407 write_namespaced_map,
408 );
409
410 state.last_reported_status = Some(status);
411 }
412 }
413
414 let mut outputs_seen = BTreeMap::<GlobalId, BTreeSet<_>>::new();
415 while let Some(event) = input.next().await {
416 if let AsyncEvent::Data(_cap, rows) = event {
417 for (worker_id, message) in rows {
418 let HealthStatusMessage {
419 id,
420 namespace: ns,
421 update: health_event,
422 } = message;
423 let id = id.unwrap_or(halting_id);
424 let HealthState {
425 healths, halt_with, ..
426 } = match health_states.get_mut(&id) {
427 Some(health) => health,
428 None => continue,
432 };
433
434 let new_round = outputs_seen
437 .entry(id)
438 .or_insert_with(BTreeSet::new)
439 .insert(ns.clone());
440
441 if !is_active_worker {
442 error!(
443 "Health messages for {object_type} {id} passed to \
444 an unexpected worker id: {healthcheck_worker_id}"
445 )
446 }
447
448 if health_event.should_halt() {
449 *halt_with = Some((ns.clone(), health_event.clone()));
450 }
451
452 healths.merge_update(worker_id, ns, health_event, !new_round);
453 }
454
455 let mut halt_with_outer = None;
456
457 while let Some((id, _)) = outputs_seen.pop_first() {
458 let HealthState {
459 healths,
460 last_reported_status,
461 halt_with,
462 } = health_states.get_mut(&id).expect("known to exist");
463
464 let new_status = healths.decide_status();
465
466 if Some(&new_status) != last_reported_status.as_ref() {
467 info!(
468 "Health transition for {object_type} {id}: \
469 {last_reported_status:?} -> {:?}",
470 Some(&new_status)
471 );
472
473 let timestamp = mz_ore::now::to_datetime(now());
474 health_operator_impl.record_new_status(
475 id,
476 timestamp,
477 (&new_status).into(),
478 new_status.error(),
479 &new_status.hints(),
480 new_status.errors().unwrap_or(&BTreeMap::new()),
481 write_namespaced_map,
482 );
483
484 *last_reported_status = Some(new_status.clone());
485 }
486
487 if halt_with_outer.is_none() && halt_with.is_some() {
489 halt_with_outer = Some((id, halt_with.clone()));
490 }
491 }
492
493 if let Some((id, halt_with)) = halt_with_outer {
498 mz_ore::soft_assert_or_log!(
499 id == halting_id,
500 "sub{object_type}s should not produce \
501 halting errors, however {:?} halted while primary \
502 {object_type} is {:?}",
503 id,
504 halting_id
505 );
506
507 info!(
508 "Broadcasting suspend-and-restart \
509 command because of {:?} after {:?} delay",
510 halt_with, suspend_and_restart_delay
511 );
512 tokio::time::sleep(suspend_and_restart_delay).await;
513 health_operator_impl.send_halt(id, halt_with);
514 }
515 }
516 }
517 });
518
519 button.press_on_drop()
520}
521
522use serde::{Deserialize, Serialize};
523
524#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
528pub enum HealthStatusUpdate {
529 Running,
530 Stalled {
531 error: String,
532 hint: Option<String>,
533 should_halt: bool,
534 },
535 Ceased {
536 error: String,
537 },
538}
539
540impl HealthStatusUpdate {
541 pub(crate) fn running() -> Self {
543 HealthStatusUpdate::Running
544 }
545
546 pub(crate) fn stalled(error: String, hint: Option<String>) -> Self {
548 HealthStatusUpdate::Stalled {
549 error,
550 hint,
551 should_halt: false,
552 }
553 }
554
555 pub(crate) fn halting(error: String, hint: Option<String>) -> Self {
557 HealthStatusUpdate::Stalled {
558 error,
559 hint,
560 should_halt: true,
561 }
562 }
563
564 pub(crate) fn should_halt(&self) -> bool {
572 match self {
573 HealthStatusUpdate::Running |
574 HealthStatusUpdate::Ceased { .. } => false,
579 HealthStatusUpdate::Stalled { should_halt, .. } => *should_halt,
580 }
581 }
582}
583
584#[cfg(test)]
585mod tests {
586 use super::*;
587 use itertools::Itertools;
588
589 #[mz_ore::test]
592 #[cfg_attr(miri, ignore)] fn test_health_operator_basic() {
594 use Step::*;
595
596 health_operator_runner(
598 2,
599 2,
600 true,
601 vec![
602 AssertStatus(vec![
603 StatusToAssert {
605 collection_index: 0,
606 status: Status::Starting,
607 ..Default::default()
608 },
609 StatusToAssert {
610 collection_index: 1,
611 status: Status::Starting,
612 ..Default::default()
613 },
614 ]),
615 Update(TestUpdate {
617 worker_id: 1,
618 namespace: StatusNamespace::Generator,
619 id: None,
620 update: HealthStatusUpdate::running(),
621 }),
622 AssertStatus(vec![StatusToAssert {
623 collection_index: 0,
624 status: Status::Running,
625 ..Default::default()
626 }]),
627 Update(TestUpdate {
634 worker_id: 1,
635 namespace: StatusNamespace::Generator,
636 id: Some(GlobalId::User(1)),
637 update: HealthStatusUpdate::running(),
638 }),
639 AssertStatus(vec![StatusToAssert {
640 collection_index: 1,
641 status: Status::Running,
642 ..Default::default()
643 }]),
644 Update(TestUpdate {
645 worker_id: 0,
646 namespace: StatusNamespace::Generator,
647 id: Some(GlobalId::User(1)),
648 update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
649 }),
650 AssertStatus(vec![StatusToAssert {
651 collection_index: 1,
652 status: Status::Stalled,
653 error: Some("generator: uhoh".to_string()),
654 errors: Some("generator: uhoh".to_string()),
655 ..Default::default()
656 }]),
657 Update(TestUpdate {
659 worker_id: 0,
660 namespace: StatusNamespace::Generator,
661 id: Some(GlobalId::User(1)),
662 update: HealthStatusUpdate::running(),
663 }),
664 AssertStatus(vec![StatusToAssert {
665 collection_index: 1,
666 status: Status::Running,
667 ..Default::default()
668 }]),
669 ],
670 );
671 }
672
673 #[mz_ore::test]
674 #[cfg_attr(miri, ignore)] fn test_health_operator_write_namespaced_map() {
676 use Step::*;
677
678 health_operator_runner(
680 2,
681 2,
682 false,
684 vec![
685 AssertStatus(vec![
686 StatusToAssert {
688 collection_index: 0,
689 status: Status::Starting,
690 ..Default::default()
691 },
692 StatusToAssert {
693 collection_index: 1,
694 status: Status::Starting,
695 ..Default::default()
696 },
697 ]),
698 Update(TestUpdate {
699 worker_id: 0,
700 namespace: StatusNamespace::Generator,
701 id: Some(GlobalId::User(1)),
702 update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
703 }),
704 AssertStatus(vec![StatusToAssert {
705 collection_index: 1,
706 status: Status::Stalled,
707 error: Some("generator: uhoh".to_string()),
708 errors: None,
709 ..Default::default()
710 }]),
711 ],
712 )
713 }
714
715 #[mz_ore::test]
716 #[cfg_attr(miri, ignore)] fn test_health_operator_namespaces() {
718 use Step::*;
719
720 health_operator_runner(
722 2,
723 1,
724 true,
725 vec![
726 AssertStatus(vec![
727 StatusToAssert {
729 collection_index: 0,
730 status: Status::Starting,
731 ..Default::default()
732 },
733 ]),
734 Update(TestUpdate {
738 worker_id: 0,
739 namespace: StatusNamespace::Generator,
740 id: None,
741 update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
742 }),
743 AssertStatus(vec![StatusToAssert {
744 collection_index: 0,
745 status: Status::Stalled,
746 error: Some("generator: uhoh".to_string()),
747 errors: Some("generator: uhoh".to_string()),
748 ..Default::default()
749 }]),
750 Update(TestUpdate {
751 worker_id: 0,
752 namespace: StatusNamespace::Kafka,
753 id: None,
754 update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
755 }),
756 AssertStatus(vec![StatusToAssert {
757 collection_index: 0,
758 status: Status::Stalled,
759 error: Some("kafka: uhoh".to_string()),
760 errors: Some("generator: uhoh, kafka: uhoh".to_string()),
761 ..Default::default()
762 }]),
763 Update(TestUpdate {
765 worker_id: 0,
766 namespace: StatusNamespace::Kafka,
767 id: None,
768 update: HealthStatusUpdate::running(),
769 }),
770 AssertStatus(vec![StatusToAssert {
771 collection_index: 0,
772 status: Status::Stalled,
773 error: Some("generator: uhoh".to_string()),
774 errors: Some("generator: uhoh".to_string()),
775 ..Default::default()
776 }]),
777 Update(TestUpdate {
778 worker_id: 0,
779 namespace: StatusNamespace::Generator,
780 id: None,
781 update: HealthStatusUpdate::running(),
782 }),
783 AssertStatus(vec![StatusToAssert {
784 collection_index: 0,
785 status: Status::Running,
786 ..Default::default()
787 }]),
788 ],
789 );
790 }
791
792 #[mz_ore::test]
793 #[cfg_attr(miri, ignore)] fn test_health_operator_namespace_side_channel() {
795 use Step::*;
796
797 health_operator_runner(
798 2,
799 1,
800 true,
801 vec![
802 AssertStatus(vec![
803 StatusToAssert {
805 collection_index: 0,
806 status: Status::Starting,
807 ..Default::default()
808 },
809 ]),
810 Update(TestUpdate {
814 worker_id: 0,
815 namespace: StatusNamespace::Ssh,
816 id: None,
817 update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
818 }),
819 AssertStatus(vec![StatusToAssert {
820 collection_index: 0,
821 status: Status::Stalled,
822 error: Some("ssh: uhoh".to_string()),
823 errors: Some("ssh: uhoh".to_string()),
824 ..Default::default()
825 }]),
826 Update(TestUpdate {
827 worker_id: 0,
828 namespace: StatusNamespace::Ssh,
829 id: None,
830 update: HealthStatusUpdate::stalled("uhoh2".to_string(), None),
831 }),
832 AssertStatus(vec![StatusToAssert {
833 collection_index: 0,
834 status: Status::Stalled,
835 error: Some("ssh: uhoh2".to_string()),
836 errors: Some("ssh: uhoh2".to_string()),
837 ..Default::default()
838 }]),
839 Update(TestUpdate {
840 worker_id: 0,
841 namespace: StatusNamespace::Ssh,
842 id: None,
843 update: HealthStatusUpdate::running(),
844 }),
845 AssertStatus(vec![StatusToAssert {
847 collection_index: 0,
848 status: Status::Starting,
849 ..Default::default()
850 }]),
851 Update(TestUpdate {
852 worker_id: 0,
853 namespace: StatusNamespace::Generator,
854 id: None,
855 update: HealthStatusUpdate::running(),
856 }),
857 AssertStatus(vec![StatusToAssert {
858 collection_index: 0,
859 status: Status::Running,
860 ..Default::default()
861 }]),
862 ],
863 );
864 }
865
866 #[mz_ore::test]
867 #[cfg_attr(miri, ignore)] fn test_health_operator_hints() {
869 use Step::*;
870
871 health_operator_runner(
872 2,
873 1,
874 true,
875 vec![
876 AssertStatus(vec![
877 StatusToAssert {
879 collection_index: 0,
880 status: Status::Starting,
881 ..Default::default()
882 },
883 ]),
884 Update(TestUpdate {
886 worker_id: 0,
887 namespace: StatusNamespace::Generator,
888 id: None,
889 update: HealthStatusUpdate::stalled(
890 "uhoh".to_string(),
891 Some("hint1".to_string()),
892 ),
893 }),
894 AssertStatus(vec![StatusToAssert {
895 collection_index: 0,
896 status: Status::Stalled,
897 error: Some("generator: uhoh".to_string()),
898 errors: Some("generator: uhoh".to_string()),
899 hint: Some("hint1".to_string()),
900 }]),
901 Update(TestUpdate {
902 worker_id: 1,
903 namespace: StatusNamespace::Generator,
904 id: None,
905 update: HealthStatusUpdate::stalled(
906 "uhoh2".to_string(),
907 Some("hint2".to_string()),
908 ),
909 }),
910 AssertStatus(vec![StatusToAssert {
911 collection_index: 0,
912 status: Status::Stalled,
913 error: Some("generator: uhoh2".to_string()),
915 errors: Some("generator: uhoh2".to_string()),
916 hint: Some("hint1, hint2".to_string()),
917 }]),
918 Update(TestUpdate {
920 worker_id: 1,
921 namespace: StatusNamespace::Generator,
922 id: None,
923 update: HealthStatusUpdate::stalled(
924 "uhoh2".to_string(),
925 Some("hint3".to_string()),
926 ),
927 }),
928 AssertStatus(vec![StatusToAssert {
929 collection_index: 0,
930 status: Status::Stalled,
931 error: Some("generator: uhoh2".to_string()),
933 errors: Some("generator: uhoh2".to_string()),
934 hint: Some("hint1, hint3".to_string()),
935 }]),
936 Update(TestUpdate {
938 worker_id: 0,
939 namespace: StatusNamespace::Generator,
940 id: None,
941 update: HealthStatusUpdate::running(),
942 }),
943 AssertStatus(vec![StatusToAssert {
944 collection_index: 0,
945 status: Status::Stalled,
946 error: Some("generator: uhoh2".to_string()),
948 errors: Some("generator: uhoh2".to_string()),
949 hint: Some("hint3".to_string()),
950 }]),
951 Update(TestUpdate {
952 worker_id: 1,
953 namespace: StatusNamespace::Generator,
954 id: None,
955 update: HealthStatusUpdate::running(),
956 }),
957 AssertStatus(vec![StatusToAssert {
958 collection_index: 0,
959 status: Status::Running,
960 ..Default::default()
961 }]),
962 ],
963 );
964 }
965
966 use mz_ore::assert_err;
969 use timely::container::CapacityContainerBuilder;
970 use timely::dataflow::Scope;
971 use timely::dataflow::operators::Enter;
972 use timely::dataflow::operators::exchange::Exchange;
973 use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
974
975 #[derive(Debug, Clone, PartialEq, Eq)]
977 struct StatusToAssert {
978 collection_index: usize,
979 status: Status,
980 error: Option<String>,
981 errors: Option<String>,
982 hint: Option<String>,
983 }
984
985 impl Default for StatusToAssert {
986 fn default() -> Self {
987 StatusToAssert {
988 collection_index: Default::default(),
989 status: Status::Running,
990 error: Default::default(),
991 errors: Default::default(),
992 hint: Default::default(),
993 }
994 }
995 }
996
997 #[derive(Debug, Clone)]
1000 struct TestUpdate {
1001 worker_id: u64,
1002 namespace: StatusNamespace,
1003 id: Option<GlobalId>,
1004 update: HealthStatusUpdate,
1005 }
1006
1007 #[derive(Debug, Clone)]
1008 enum Step {
1009 Update(TestUpdate),
1011 AssertStatus(Vec<StatusToAssert>),
1014 }
1015
1016 struct TestWriter {
1017 sender: UnboundedSender<StatusToAssert>,
1018 input_mapping: BTreeMap<GlobalId, usize>,
1019 }
1020
1021 impl HealthOperator for TestWriter {
1022 fn record_new_status(
1023 &self,
1024 collection_id: GlobalId,
1025 _ts: DateTime<Utc>,
1026 status: Status,
1027 new_error: Option<&str>,
1028 hints: &BTreeSet<String>,
1029 namespaced_errors: &BTreeMap<StatusNamespace, String>,
1030 write_namespaced_map: bool,
1031 ) {
1032 let _ = self.sender.send(StatusToAssert {
1033 collection_index: *self.input_mapping.get(&collection_id).unwrap(),
1034 status,
1035 error: new_error.map(str::to_string),
1036 errors: if !namespaced_errors.is_empty() && write_namespaced_map {
1037 Some(
1038 namespaced_errors
1039 .iter()
1040 .map(|(ns, err)| format!("{}: {}", ns, err))
1041 .join(", "),
1042 )
1043 } else {
1044 None
1045 },
1046 hint: if !hints.is_empty() {
1047 Some(hints.iter().join(", "))
1048 } else {
1049 None
1050 },
1051 });
1052 }
1053
1054 fn send_halt(&self, _id: GlobalId, _error: Option<(StatusNamespace, HealthStatusUpdate)>) {
1055 unimplemented!()
1057 }
1058
1059 fn chosen_worker(&self) -> Option<usize> {
1060 Some(0)
1062 }
1063 }
1064
1065 fn health_operator_runner(
1068 workers: usize,
1069 inputs: usize,
1070 write_namespaced_map: bool,
1071 steps: Vec<Step>,
1072 ) {
1073 let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
1074 let tokio_handle = tokio_runtime.handle().clone();
1075
1076 let inputs: BTreeMap<GlobalId, usize> = (0..inputs)
1077 .map(|index| (GlobalId::User(u64::cast_from(index)), index))
1078 .collect();
1079
1080 timely::execute::execute(
1081 timely::execute::Config {
1082 communication: timely::CommunicationConfig::Process(workers),
1083 worker: Default::default(),
1084 },
1085 move |worker| {
1086 let steps = steps.clone();
1087 let inputs = inputs.clone();
1088
1089 let _tokio_guard = tokio_handle.enter();
1090 let (in_tx, in_rx) = unbounded_channel();
1091 let (out_tx, mut out_rx) = unbounded_channel();
1092
1093 worker.dataflow::<(), _, _>(|root_scope| {
1094 root_scope
1095 .clone()
1096 .scoped::<mz_repr::Timestamp, _, _>("gus", |scope| {
1097 let input = producer(root_scope.clone(), in_rx).enter(scope);
1098 Box::leak(Box::new(health_operator(
1099 scope,
1100 mz_ore::now::SYSTEM_TIME.clone(),
1101 inputs.keys().copied().collect(),
1102 *inputs.first_key_value().unwrap().0,
1103 "source_test",
1104 &input,
1105 TestWriter {
1106 sender: out_tx,
1107 input_mapping: inputs,
1108 },
1109 write_namespaced_map,
1110 Duration::from_secs(5),
1111 )));
1112 });
1113 });
1114
1115 if worker.index() == 0 {
1117 use Step::*;
1118 for step in steps {
1119 match step {
1120 Update(update) => {
1121 let _ = in_tx.send(update);
1122 }
1123 AssertStatus(mut statuses) => loop {
1124 match out_rx.try_recv() {
1125 Err(_) => {
1126 worker.step();
1127 std::thread::sleep(std::time::Duration::from_millis(50));
1129 }
1130 Ok(update) => {
1131 let pos = statuses
1132 .iter()
1133 .position(|s| {
1134 s.collection_index == update.collection_index
1135 })
1136 .unwrap();
1137
1138 let status_to_assert = &statuses[pos];
1139 assert_eq!(&update, status_to_assert);
1140
1141 statuses.remove(pos);
1142 if statuses.is_empty() {
1143 break;
1144 }
1145 }
1146 }
1147 },
1148 }
1149 }
1150
1151 assert_err!(out_rx.try_recv());
1153 }
1154 },
1155 )
1156 .unwrap();
1157 }
1158
1159 fn producer<G: Scope<Timestamp = ()>>(
1166 scope: G,
1167 mut input: UnboundedReceiver<TestUpdate>,
1168 ) -> Stream<G, HealthStatusMessage> {
1169 let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope.clone());
1170 let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<_>>>();
1171
1172 let index = scope.index();
1173 iterator.build(|mut caps| async move {
1174 if index != 0 {
1176 return;
1177 }
1178 let mut capability = Some(caps.pop().unwrap());
1179 while let Some(element) = input.recv().await {
1180 output_handle.give(
1181 capability.as_ref().unwrap(),
1182 (
1183 element.worker_id,
1184 element.id,
1185 element.namespace,
1186 element.update,
1187 ),
1188 );
1189 }
1190
1191 capability.take();
1192 });
1193
1194 let output = output.exchange(|d| d.0).map(|d| HealthStatusMessage {
1195 id: d.1,
1196 namespace: d.2,
1197 update: d.3,
1198 });
1199
1200 output
1201 }
1202}