1use std::cell::RefCell;
13use std::collections::btree_map::Entry;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::fmt::Debug;
17use std::rc::Rc;
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use differential_dataflow::Hashable;
22use futures::StreamExt;
23use mz_ore::cast::CastFrom;
24use mz_ore::now::NowFn;
25use mz_repr::GlobalId;
26use mz_storage_client::client::{Status, StatusUpdate};
27use mz_timely_util::builder_async::{
28 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
29};
30use timely::dataflow::channels::pact::Exchange;
31use timely::dataflow::operators::{Enter, Map};
32use timely::dataflow::scopes::Child;
33use timely::dataflow::{Scope, Stream};
34use tracing::{error, info};
35
36use crate::internal_control::{InternalCommandSender, InternalStorageCommand};
37
38#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
46pub enum StatusNamespace {
47 Generator,
50 Kafka,
51 Postgres,
52 MySql,
53 SqlServer,
54 Ssh,
55 Upsert,
56 Decode,
57 Internal,
58}
59
60impl StatusNamespace {
61 fn is_sidechannel(&self) -> bool {
62 matches!(self, StatusNamespace::Ssh)
63 }
64}
65
66impl fmt::Display for StatusNamespace {
67 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68 use StatusNamespace::*;
69 match self {
70 Generator => write!(f, "generator"),
71 Kafka => write!(f, "kafka"),
72 Postgres => write!(f, "postgres"),
73 MySql => write!(f, "mysql"),
74 SqlServer => write!(f, "sql-server"),
75 Ssh => write!(f, "ssh"),
76 Upsert => write!(f, "upsert"),
77 Decode => write!(f, "decode"),
78 Internal => write!(f, "internal"),
79 }
80 }
81}
82
83#[derive(Debug)]
84struct PerWorkerHealthStatus {
85 pub(crate) errors_by_worker: Vec<BTreeMap<StatusNamespace, HealthStatusUpdate>>,
86}
87
88impl PerWorkerHealthStatus {
89 fn merge_update(
90 &mut self,
91 worker: usize,
92 namespace: StatusNamespace,
93 update: HealthStatusUpdate,
94 only_greater: bool,
95 ) {
96 let errors = &mut self.errors_by_worker[worker];
97 match errors.entry(namespace) {
98 Entry::Vacant(v) => {
99 v.insert(update);
100 }
101 Entry::Occupied(mut o) => {
102 if !only_greater || o.get() < &update {
103 o.insert(update);
104 }
105 }
106 }
107 }
108
109 fn decide_status(&self) -> OverallStatus {
110 let mut output_status = OverallStatus::Starting;
111 let mut namespaced_errors: BTreeMap<StatusNamespace, String> = BTreeMap::new();
112 let mut hints: BTreeSet<String> = BTreeSet::new();
113
114 for status in self.errors_by_worker.iter() {
115 for (ns, ns_status) in status.iter() {
116 match ns_status {
117 HealthStatusUpdate::Ceased { error } => {
122 if Some(error) > namespaced_errors.get(ns).as_deref() {
123 namespaced_errors.insert(*ns, error.to_string());
124 }
125 }
126 HealthStatusUpdate::Stalled { error, hint, .. } => {
127 if Some(error) > namespaced_errors.get(ns).as_deref() {
128 namespaced_errors.insert(*ns, error.to_string());
129 }
130
131 if let Some(hint) = hint {
132 hints.insert(hint.to_string());
133 }
134 }
135 HealthStatusUpdate::Running => {
136 if !ns.is_sidechannel() {
137 output_status = OverallStatus::Running;
138 }
139 }
140 }
141 }
142 }
143
144 if !namespaced_errors.is_empty() {
145 let (ns, err) = namespaced_errors.last_key_value().unwrap();
147 output_status = OverallStatus::Stalled {
148 error: format!("{}: {}", ns, err),
149 hints,
150 namespaced_errors,
151 }
152 }
153
154 output_status
155 }
156}
157
158#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
159pub enum OverallStatus {
160 Starting,
161 Running,
162 Stalled {
163 error: String,
164 hints: BTreeSet<String>,
165 namespaced_errors: BTreeMap<StatusNamespace, String>,
166 },
167 Ceased {
168 error: String,
169 },
170}
171
172impl OverallStatus {
173 pub(crate) fn error(&self) -> Option<&str> {
175 match self {
176 OverallStatus::Starting | OverallStatus::Running => None,
177 OverallStatus::Stalled { error, .. } | OverallStatus::Ceased { error, .. } => {
178 Some(error)
179 }
180 }
181 }
182
183 pub(crate) fn errors(&self) -> Option<&BTreeMap<StatusNamespace, String>> {
185 match self {
186 OverallStatus::Starting | OverallStatus::Running | OverallStatus::Ceased { .. } => None,
187 OverallStatus::Stalled {
188 namespaced_errors, ..
189 } => Some(namespaced_errors),
190 }
191 }
192
193 pub(crate) fn hints(&self) -> BTreeSet<String> {
195 match self {
196 OverallStatus::Starting | OverallStatus::Running | OverallStatus::Ceased { .. } => {
197 BTreeSet::new()
198 }
199 OverallStatus::Stalled { hints, .. } => hints.clone(),
200 }
201 }
202}
203
204impl<'a> From<&'a OverallStatus> for Status {
205 fn from(val: &'a OverallStatus) -> Self {
206 match val {
207 OverallStatus::Starting => Status::Starting,
208 OverallStatus::Running => Status::Running,
209 OverallStatus::Stalled { .. } => Status::Stalled,
210 OverallStatus::Ceased { .. } => Status::Ceased,
211 }
212 }
213}
214
215#[derive(Debug)]
216struct HealthState {
217 healths: PerWorkerHealthStatus,
218 last_reported_status: Option<OverallStatus>,
219 halt_with: Option<(StatusNamespace, HealthStatusUpdate)>,
220}
221
222impl HealthState {
223 fn new(worker_count: usize) -> HealthState {
224 HealthState {
225 healths: PerWorkerHealthStatus {
226 errors_by_worker: vec![Default::default(); worker_count],
227 },
228 last_reported_status: None,
229 halt_with: None,
230 }
231 }
232}
233
234pub trait HealthOperator {
238 fn record_new_status(
240 &self,
241 collection_id: GlobalId,
242 ts: DateTime<Utc>,
243 new_status: Status,
244 new_error: Option<&str>,
245 hints: &BTreeSet<String>,
246 namespaced_errors: &BTreeMap<StatusNamespace, String>,
247 write_namespaced_map: bool,
252 );
253 fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>);
254
255 fn chosen_worker(&self) -> Option<usize> {
258 None
259 }
260}
261
262pub struct DefaultWriter {
264 pub command_tx: InternalCommandSender,
265 pub updates: Rc<RefCell<Vec<StatusUpdate>>>,
266}
267
268impl HealthOperator for DefaultWriter {
269 fn record_new_status(
270 &self,
271 collection_id: GlobalId,
272 ts: DateTime<Utc>,
273 status: Status,
274 new_error: Option<&str>,
275 hints: &BTreeSet<String>,
276 namespaced_errors: &BTreeMap<StatusNamespace, String>,
277 write_namespaced_map: bool,
278 ) {
279 self.updates.borrow_mut().push(StatusUpdate {
280 id: collection_id,
281 timestamp: ts,
282 status,
283 error: new_error.map(|e| e.to_string()),
284 hints: hints.clone(),
285 namespaced_errors: if write_namespaced_map {
286 namespaced_errors
287 .iter()
288 .map(|(ns, val)| (ns.to_string(), val.clone()))
289 .collect()
290 } else {
291 BTreeMap::new()
292 },
293 replica_id: None,
294 });
295 }
296
297 fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>) {
298 self.command_tx
299 .send(InternalStorageCommand::SuspendAndRestart {
300 id,
303 reason: format!("{:?}", error),
304 });
305 }
306}
307
308#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
310pub struct HealthStatusMessage {
311 pub id: Option<GlobalId>,
314 pub namespace: StatusNamespace,
316 pub update: HealthStatusUpdate,
318}
319
320pub(crate) fn health_operator<'g, G, P>(
328 scope: &Child<'g, G, mz_repr::Timestamp>,
329 now: NowFn,
330 mark_starting: BTreeSet<GlobalId>,
332 halting_id: GlobalId,
335 object_type: &'static str,
337 health_stream: &Stream<G, HealthStatusMessage>,
339 health_operator_impl: P,
341 write_namespaced_map: bool,
343 suspend_and_restart_delay: Duration,
346) -> PressOnDropButton
347where
348 G: Scope<Timestamp = ()>,
349 P: HealthOperator + 'static,
350{
351 let healthcheck_worker_id = scope.index();
353 let worker_count = scope.peers();
354
355 let health_stream = health_stream.map(move |status| (healthcheck_worker_id, status));
357
358 let chosen_worker_id = if let Some(index) = health_operator_impl.chosen_worker() {
359 index
360 } else {
361 usize::cast_from(mark_starting.iter().next().hashed()) % worker_count
364 };
365
366 let is_active_worker = chosen_worker_id == healthcheck_worker_id;
367
368 let operator_name = format!("healthcheck({})", healthcheck_worker_id);
369 let mut health_op = AsyncOperatorBuilder::new(operator_name, scope.clone());
370
371 let health = health_stream.enter(scope);
372
373 let mut input = health_op.new_disconnected_input(
374 &health,
375 Exchange::new(move |_| u64::cast_from(chosen_worker_id)),
376 );
377
378 let button = health_op.build(move |mut _capabilities| async move {
379 let mut health_states: BTreeMap<_, _> = mark_starting
380 .iter()
381 .copied()
382 .chain([halting_id])
383 .map(|id| (id, HealthState::new(worker_count)))
384 .collect();
385
386 if is_active_worker {
388 for (id, state) in health_states.iter_mut() {
389 if mark_starting.contains(id) {
390 let status = OverallStatus::Starting;
391 let timestamp = mz_ore::now::to_datetime(now());
392 health_operator_impl.record_new_status(
393 *id,
394 timestamp,
395 (&status).into(),
396 status.error(),
397 &status.hints(),
398 status.errors().unwrap_or(&BTreeMap::new()),
399 write_namespaced_map,
400 );
401
402 state.last_reported_status = Some(status);
403 }
404 }
405 }
406
407 let mut outputs_seen = BTreeMap::<GlobalId, BTreeSet<_>>::new();
408 while let Some(event) = input.next().await {
409 if let AsyncEvent::Data(_cap, rows) = event {
410 for (worker_id, message) in rows {
411 let HealthStatusMessage {
412 id,
413 namespace: ns,
414 update: health_event,
415 } = message;
416 let id = id.unwrap_or(halting_id);
417 let HealthState {
418 healths, halt_with, ..
419 } = match health_states.get_mut(&id) {
420 Some(health) => health,
421 None => continue,
425 };
426
427 let new_round = outputs_seen
430 .entry(id)
431 .or_insert_with(BTreeSet::new)
432 .insert(ns.clone());
433
434 if !is_active_worker {
435 error!(
436 "Health messages for {object_type} {id} passed to \
437 an unexpected worker id: {healthcheck_worker_id}"
438 )
439 }
440
441 if health_event.should_halt() {
442 *halt_with = Some((ns.clone(), health_event.clone()));
443 }
444
445 healths.merge_update(worker_id, ns, health_event, !new_round);
446 }
447
448 let mut halt_with_outer = None;
449
450 while let Some((id, _)) = outputs_seen.pop_first() {
451 let HealthState {
452 healths,
453 last_reported_status,
454 halt_with,
455 } = health_states.get_mut(&id).expect("known to exist");
456
457 let new_status = healths.decide_status();
458
459 if Some(&new_status) != last_reported_status.as_ref() {
460 info!(
461 "Health transition for {object_type} {id}: \
462 {last_reported_status:?} -> {:?}",
463 Some(&new_status)
464 );
465
466 let timestamp = mz_ore::now::to_datetime(now());
467 health_operator_impl.record_new_status(
468 id,
469 timestamp,
470 (&new_status).into(),
471 new_status.error(),
472 &new_status.hints(),
473 new_status.errors().unwrap_or(&BTreeMap::new()),
474 write_namespaced_map,
475 );
476
477 *last_reported_status = Some(new_status.clone());
478 }
479
480 if halt_with_outer.is_none() && halt_with.is_some() {
482 halt_with_outer = Some((id, halt_with.clone()));
483 }
484 }
485
486 if let Some((id, halt_with)) = halt_with_outer {
491 mz_ore::soft_assert_or_log!(
492 id == halting_id,
493 "sub{object_type}s should not produce \
494 halting errors, however {:?} halted while primary \
495 {object_type} is {:?}",
496 id,
497 halting_id
498 );
499
500 info!(
501 "Broadcasting suspend-and-restart \
502 command because of {:?} after {:?} delay",
503 halt_with, suspend_and_restart_delay
504 );
505 tokio::time::sleep(suspend_and_restart_delay).await;
506 health_operator_impl.send_halt(id, halt_with);
507 }
508 }
509 }
510 });
511
512 button.press_on_drop()
513}
514
515use serde::{Deserialize, Serialize};
516
517#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
521pub enum HealthStatusUpdate {
522 Running,
523 Stalled {
524 error: String,
525 hint: Option<String>,
526 should_halt: bool,
527 },
528 Ceased {
529 error: String,
530 },
531}
532
533impl HealthStatusUpdate {
534 pub(crate) fn running() -> Self {
536 HealthStatusUpdate::Running
537 }
538
539 pub(crate) fn stalled(error: String, hint: Option<String>) -> Self {
541 HealthStatusUpdate::Stalled {
542 error,
543 hint,
544 should_halt: false,
545 }
546 }
547
548 pub(crate) fn halting(error: String, hint: Option<String>) -> Self {
550 HealthStatusUpdate::Stalled {
551 error,
552 hint,
553 should_halt: true,
554 }
555 }
556
557 pub(crate) fn should_halt(&self) -> bool {
565 match self {
566 HealthStatusUpdate::Running |
567 HealthStatusUpdate::Ceased { .. } => false,
572 HealthStatusUpdate::Stalled { should_halt, .. } => *should_halt,
573 }
574 }
575}
576
577#[cfg(test)]
578mod tests {
579 use super::*;
580 use itertools::Itertools;
581
582 #[mz_ore::test]
585 #[cfg_attr(miri, ignore)] fn test_health_operator_basic() {
587 use Step::*;
588
589 health_operator_runner(
591 2,
592 2,
593 true,
594 vec![
595 AssertStatus(vec![
596 StatusToAssert {
598 collection_index: 0,
599 status: Status::Starting,
600 ..Default::default()
601 },
602 StatusToAssert {
603 collection_index: 1,
604 status: Status::Starting,
605 ..Default::default()
606 },
607 ]),
608 Update(TestUpdate {
610 worker_id: 1,
611 namespace: StatusNamespace::Generator,
612 id: None,
613 update: HealthStatusUpdate::running(),
614 }),
615 AssertStatus(vec![StatusToAssert {
616 collection_index: 0,
617 status: Status::Running,
618 ..Default::default()
619 }]),
620 Update(TestUpdate {
627 worker_id: 1,
628 namespace: StatusNamespace::Generator,
629 id: Some(GlobalId::User(1)),
630 update: HealthStatusUpdate::running(),
631 }),
632 AssertStatus(vec![StatusToAssert {
633 collection_index: 1,
634 status: Status::Running,
635 ..Default::default()
636 }]),
637 Update(TestUpdate {
638 worker_id: 0,
639 namespace: StatusNamespace::Generator,
640 id: Some(GlobalId::User(1)),
641 update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
642 }),
643 AssertStatus(vec![StatusToAssert {
644 collection_index: 1,
645 status: Status::Stalled,
646 error: Some("generator: uhoh".to_string()),
647 errors: Some("generator: uhoh".to_string()),
648 ..Default::default()
649 }]),
650 Update(TestUpdate {
652 worker_id: 0,
653 namespace: StatusNamespace::Generator,
654 id: Some(GlobalId::User(1)),
655 update: HealthStatusUpdate::running(),
656 }),
657 AssertStatus(vec![StatusToAssert {
658 collection_index: 1,
659 status: Status::Running,
660 ..Default::default()
661 }]),
662 ],
663 );
664 }
665
666 #[mz_ore::test]
667 #[cfg_attr(miri, ignore)] fn test_health_operator_write_namespaced_map() {
669 use Step::*;
670
671 health_operator_runner(
673 2,
674 2,
675 false,
677 vec![
678 AssertStatus(vec![
679 StatusToAssert {
681 collection_index: 0,
682 status: Status::Starting,
683 ..Default::default()
684 },
685 StatusToAssert {
686 collection_index: 1,
687 status: Status::Starting,
688 ..Default::default()
689 },
690 ]),
691 Update(TestUpdate {
692 worker_id: 0,
693 namespace: StatusNamespace::Generator,
694 id: Some(GlobalId::User(1)),
695 update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
696 }),
697 AssertStatus(vec![StatusToAssert {
698 collection_index: 1,
699 status: Status::Stalled,
700 error: Some("generator: uhoh".to_string()),
701 errors: None,
702 ..Default::default()
703 }]),
704 ],
705 )
706 }
707
708 #[mz_ore::test]
709 #[cfg_attr(miri, ignore)] fn test_health_operator_namespaces() {
711 use Step::*;
712
713 health_operator_runner(
715 2,
716 1,
717 true,
718 vec![
719 AssertStatus(vec![
720 StatusToAssert {
722 collection_index: 0,
723 status: Status::Starting,
724 ..Default::default()
725 },
726 ]),
727 Update(TestUpdate {
731 worker_id: 0,
732 namespace: StatusNamespace::Generator,
733 id: None,
734 update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
735 }),
736 AssertStatus(vec![StatusToAssert {
737 collection_index: 0,
738 status: Status::Stalled,
739 error: Some("generator: uhoh".to_string()),
740 errors: Some("generator: uhoh".to_string()),
741 ..Default::default()
742 }]),
743 Update(TestUpdate {
744 worker_id: 0,
745 namespace: StatusNamespace::Kafka,
746 id: None,
747 update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
748 }),
749 AssertStatus(vec![StatusToAssert {
750 collection_index: 0,
751 status: Status::Stalled,
752 error: Some("kafka: uhoh".to_string()),
753 errors: Some("generator: uhoh, kafka: uhoh".to_string()),
754 ..Default::default()
755 }]),
756 Update(TestUpdate {
758 worker_id: 0,
759 namespace: StatusNamespace::Kafka,
760 id: None,
761 update: HealthStatusUpdate::running(),
762 }),
763 AssertStatus(vec![StatusToAssert {
764 collection_index: 0,
765 status: Status::Stalled,
766 error: Some("generator: uhoh".to_string()),
767 errors: Some("generator: uhoh".to_string()),
768 ..Default::default()
769 }]),
770 Update(TestUpdate {
771 worker_id: 0,
772 namespace: StatusNamespace::Generator,
773 id: None,
774 update: HealthStatusUpdate::running(),
775 }),
776 AssertStatus(vec![StatusToAssert {
777 collection_index: 0,
778 status: Status::Running,
779 ..Default::default()
780 }]),
781 ],
782 );
783 }
784
785 #[mz_ore::test]
786 #[cfg_attr(miri, ignore)] fn test_health_operator_namespace_side_channel() {
788 use Step::*;
789
790 health_operator_runner(
791 2,
792 1,
793 true,
794 vec![
795 AssertStatus(vec![
796 StatusToAssert {
798 collection_index: 0,
799 status: Status::Starting,
800 ..Default::default()
801 },
802 ]),
803 Update(TestUpdate {
807 worker_id: 0,
808 namespace: StatusNamespace::Ssh,
809 id: None,
810 update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
811 }),
812 AssertStatus(vec![StatusToAssert {
813 collection_index: 0,
814 status: Status::Stalled,
815 error: Some("ssh: uhoh".to_string()),
816 errors: Some("ssh: uhoh".to_string()),
817 ..Default::default()
818 }]),
819 Update(TestUpdate {
820 worker_id: 0,
821 namespace: StatusNamespace::Ssh,
822 id: None,
823 update: HealthStatusUpdate::stalled("uhoh2".to_string(), None),
824 }),
825 AssertStatus(vec![StatusToAssert {
826 collection_index: 0,
827 status: Status::Stalled,
828 error: Some("ssh: uhoh2".to_string()),
829 errors: Some("ssh: uhoh2".to_string()),
830 ..Default::default()
831 }]),
832 Update(TestUpdate {
833 worker_id: 0,
834 namespace: StatusNamespace::Ssh,
835 id: None,
836 update: HealthStatusUpdate::running(),
837 }),
838 AssertStatus(vec![StatusToAssert {
840 collection_index: 0,
841 status: Status::Starting,
842 ..Default::default()
843 }]),
844 Update(TestUpdate {
845 worker_id: 0,
846 namespace: StatusNamespace::Generator,
847 id: None,
848 update: HealthStatusUpdate::running(),
849 }),
850 AssertStatus(vec![StatusToAssert {
851 collection_index: 0,
852 status: Status::Running,
853 ..Default::default()
854 }]),
855 ],
856 );
857 }
858
859 #[mz_ore::test]
860 #[cfg_attr(miri, ignore)] fn test_health_operator_hints() {
862 use Step::*;
863
864 health_operator_runner(
865 2,
866 1,
867 true,
868 vec![
869 AssertStatus(vec![
870 StatusToAssert {
872 collection_index: 0,
873 status: Status::Starting,
874 ..Default::default()
875 },
876 ]),
877 Update(TestUpdate {
879 worker_id: 0,
880 namespace: StatusNamespace::Generator,
881 id: None,
882 update: HealthStatusUpdate::stalled(
883 "uhoh".to_string(),
884 Some("hint1".to_string()),
885 ),
886 }),
887 AssertStatus(vec![StatusToAssert {
888 collection_index: 0,
889 status: Status::Stalled,
890 error: Some("generator: uhoh".to_string()),
891 errors: Some("generator: uhoh".to_string()),
892 hint: Some("hint1".to_string()),
893 }]),
894 Update(TestUpdate {
895 worker_id: 1,
896 namespace: StatusNamespace::Generator,
897 id: None,
898 update: HealthStatusUpdate::stalled(
899 "uhoh2".to_string(),
900 Some("hint2".to_string()),
901 ),
902 }),
903 AssertStatus(vec![StatusToAssert {
904 collection_index: 0,
905 status: Status::Stalled,
906 error: Some("generator: uhoh2".to_string()),
908 errors: Some("generator: uhoh2".to_string()),
909 hint: Some("hint1, hint2".to_string()),
910 }]),
911 Update(TestUpdate {
913 worker_id: 1,
914 namespace: StatusNamespace::Generator,
915 id: None,
916 update: HealthStatusUpdate::stalled(
917 "uhoh2".to_string(),
918 Some("hint3".to_string()),
919 ),
920 }),
921 AssertStatus(vec![StatusToAssert {
922 collection_index: 0,
923 status: Status::Stalled,
924 error: Some("generator: uhoh2".to_string()),
926 errors: Some("generator: uhoh2".to_string()),
927 hint: Some("hint1, hint3".to_string()),
928 }]),
929 Update(TestUpdate {
931 worker_id: 0,
932 namespace: StatusNamespace::Generator,
933 id: None,
934 update: HealthStatusUpdate::running(),
935 }),
936 AssertStatus(vec![StatusToAssert {
937 collection_index: 0,
938 status: Status::Stalled,
939 error: Some("generator: uhoh2".to_string()),
941 errors: Some("generator: uhoh2".to_string()),
942 hint: Some("hint3".to_string()),
943 }]),
944 Update(TestUpdate {
945 worker_id: 1,
946 namespace: StatusNamespace::Generator,
947 id: None,
948 update: HealthStatusUpdate::running(),
949 }),
950 AssertStatus(vec![StatusToAssert {
951 collection_index: 0,
952 status: Status::Running,
953 ..Default::default()
954 }]),
955 ],
956 );
957 }
958
959 use mz_ore::assert_err;
962 use timely::container::CapacityContainerBuilder;
963 use timely::dataflow::Scope;
964 use timely::dataflow::operators::exchange::Exchange;
965 use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
966
967 #[derive(Debug, Clone, PartialEq, Eq)]
969 struct StatusToAssert {
970 collection_index: usize,
971 status: Status,
972 error: Option<String>,
973 errors: Option<String>,
974 hint: Option<String>,
975 }
976
977 impl Default for StatusToAssert {
978 fn default() -> Self {
979 StatusToAssert {
980 collection_index: Default::default(),
981 status: Status::Running,
982 error: Default::default(),
983 errors: Default::default(),
984 hint: Default::default(),
985 }
986 }
987 }
988
989 #[derive(Debug, Clone)]
992 struct TestUpdate {
993 worker_id: u64,
994 namespace: StatusNamespace,
995 id: Option<GlobalId>,
996 update: HealthStatusUpdate,
997 }
998
999 #[derive(Debug, Clone)]
1000 enum Step {
1001 Update(TestUpdate),
1003 AssertStatus(Vec<StatusToAssert>),
1006 }
1007
1008 struct TestWriter {
1009 sender: UnboundedSender<StatusToAssert>,
1010 input_mapping: BTreeMap<GlobalId, usize>,
1011 }
1012
1013 impl HealthOperator for TestWriter {
1014 fn record_new_status(
1015 &self,
1016 collection_id: GlobalId,
1017 _ts: DateTime<Utc>,
1018 status: Status,
1019 new_error: Option<&str>,
1020 hints: &BTreeSet<String>,
1021 namespaced_errors: &BTreeMap<StatusNamespace, String>,
1022 write_namespaced_map: bool,
1023 ) {
1024 let _ = self.sender.send(StatusToAssert {
1025 collection_index: *self.input_mapping.get(&collection_id).unwrap(),
1026 status,
1027 error: new_error.map(str::to_string),
1028 errors: if !namespaced_errors.is_empty() && write_namespaced_map {
1029 Some(
1030 namespaced_errors
1031 .iter()
1032 .map(|(ns, err)| format!("{}: {}", ns, err))
1033 .join(", "),
1034 )
1035 } else {
1036 None
1037 },
1038 hint: if !hints.is_empty() {
1039 Some(hints.iter().join(", "))
1040 } else {
1041 None
1042 },
1043 });
1044 }
1045
1046 fn send_halt(&self, _id: GlobalId, _error: Option<(StatusNamespace, HealthStatusUpdate)>) {
1047 unimplemented!()
1049 }
1050
1051 fn chosen_worker(&self) -> Option<usize> {
1052 Some(0)
1054 }
1055 }
1056
1057 fn health_operator_runner(
1060 workers: usize,
1061 inputs: usize,
1062 write_namespaced_map: bool,
1063 steps: Vec<Step>,
1064 ) {
1065 let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
1066 let tokio_handle = tokio_runtime.handle().clone();
1067
1068 let inputs: BTreeMap<GlobalId, usize> = (0..inputs)
1069 .map(|index| (GlobalId::User(u64::cast_from(index)), index))
1070 .collect();
1071
1072 timely::execute::execute(
1073 timely::execute::Config {
1074 communication: timely::CommunicationConfig::Process(workers),
1075 worker: Default::default(),
1076 },
1077 move |worker| {
1078 let steps = steps.clone();
1079 let inputs = inputs.clone();
1080
1081 let _tokio_guard = tokio_handle.enter();
1082 let (in_tx, in_rx) = unbounded_channel();
1083 let (out_tx, mut out_rx) = unbounded_channel();
1084
1085 worker.dataflow::<(), _, _>(|root_scope| {
1086 root_scope
1087 .clone()
1088 .scoped::<mz_repr::Timestamp, _, _>("gus", |scope| {
1089 let input = producer(root_scope.clone(), in_rx);
1090 Box::leak(Box::new(health_operator(
1091 scope,
1092 mz_ore::now::SYSTEM_TIME.clone(),
1093 inputs.keys().copied().collect(),
1094 *inputs.first_key_value().unwrap().0,
1095 "source_test",
1096 &input,
1097 TestWriter {
1098 sender: out_tx,
1099 input_mapping: inputs,
1100 },
1101 write_namespaced_map,
1102 Duration::from_secs(5),
1103 )));
1104 });
1105 });
1106
1107 if worker.index() == 0 {
1109 use Step::*;
1110 for step in steps {
1111 match step {
1112 Update(update) => {
1113 let _ = in_tx.send(update);
1114 }
1115 AssertStatus(mut statuses) => loop {
1116 match out_rx.try_recv() {
1117 Err(_) => {
1118 worker.step();
1119 std::thread::sleep(std::time::Duration::from_millis(50));
1121 }
1122 Ok(update) => {
1123 let pos = statuses
1124 .iter()
1125 .position(|s| {
1126 s.collection_index == update.collection_index
1127 })
1128 .unwrap();
1129
1130 let status_to_assert = &statuses[pos];
1131 assert_eq!(&update, status_to_assert);
1132
1133 statuses.remove(pos);
1134 if statuses.is_empty() {
1135 break;
1136 }
1137 }
1138 }
1139 },
1140 }
1141 }
1142
1143 assert_err!(out_rx.try_recv());
1145 }
1146 },
1147 )
1148 .unwrap();
1149 }
1150
1151 fn producer<G: Scope<Timestamp = ()>>(
1158 scope: G,
1159 mut input: UnboundedReceiver<TestUpdate>,
1160 ) -> Stream<G, HealthStatusMessage> {
1161 let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope.clone());
1162 let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<_>>>();
1163
1164 let index = scope.index();
1165 iterator.build(|mut caps| async move {
1166 if index != 0 {
1168 return;
1169 }
1170 let mut capability = Some(caps.pop().unwrap());
1171 while let Some(element) = input.recv().await {
1172 output_handle.give(
1173 capability.as_ref().unwrap(),
1174 (
1175 element.worker_id,
1176 element.id,
1177 element.namespace,
1178 element.update,
1179 ),
1180 );
1181 }
1182
1183 capability.take();
1184 });
1185
1186 let output = output.exchange(|d| d.0).map(|d| HealthStatusMessage {
1187 id: d.1,
1188 namespace: d.2,
1189 update: d.3,
1190 });
1191
1192 output
1193 }
1194}