Skip to main content

mz_storage/
healthcheck.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Healthcheck common
11
12use std::cell::RefCell;
13use std::collections::btree_map::Entry;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::fmt::Debug;
17use std::rc::Rc;
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use differential_dataflow::Hashable;
22use futures::StreamExt;
23use mz_ore::cast::CastFrom;
24use mz_ore::now::NowFn;
25use mz_repr::GlobalId;
26use mz_storage_client::client::{Status, StatusUpdate};
27use mz_timely_util::builder_async::{
28    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
29};
30use timely::dataflow::channels::pact::Exchange;
31use timely::dataflow::operators::vec::Map;
32use timely::dataflow::{Scope, StreamVec};
33use timely::progress::Timestamp;
34use tracing::{error, info};
35
36use crate::internal_control::{InternalCommandSender, InternalStorageCommand};
37
38/// The namespace of the update. The `Ord` impl matter here, later variants are
39/// displayed over earlier ones.
40///
41/// Some namespaces (referred to as "sidechannels") can come from any worker_id,
42/// and `Running` statuses from them do not mark the entire object as running.
43///
44/// Ensure you update `is_sidechannel` when adding variants.
45#[derive(
46    Copy,
47    Clone,
48    Debug,
49    Serialize,
50    Deserialize,
51    PartialEq,
52    Eq,
53    PartialOrd,
54    Ord
55)]
56pub enum StatusNamespace {
57    /// A normal status namespaces. Any `Running` status from any worker will mark the object
58    /// `Running`.
59    Generator,
60    Kafka,
61    Postgres,
62    MySql,
63    SqlServer,
64    Ssh,
65    Upsert,
66    Decode,
67    Iceberg,
68    Internal,
69}
70
71impl StatusNamespace {
72    fn is_sidechannel(&self) -> bool {
73        matches!(self, StatusNamespace::Ssh)
74    }
75}
76
77impl fmt::Display for StatusNamespace {
78    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79        use StatusNamespace::*;
80        match self {
81            Generator => write!(f, "generator"),
82            Kafka => write!(f, "kafka"),
83            Postgres => write!(f, "postgres"),
84            MySql => write!(f, "mysql"),
85            SqlServer => write!(f, "sql-server"),
86            Ssh => write!(f, "ssh"),
87            Upsert => write!(f, "upsert"),
88            Decode => write!(f, "decode"),
89            Internal => write!(f, "internal"),
90            Iceberg => write!(f, "iceberg"),
91        }
92    }
93}
94
95#[derive(Debug)]
96struct PerWorkerHealthStatus {
97    pub(crate) errors_by_worker: Vec<BTreeMap<StatusNamespace, HealthStatusUpdate>>,
98}
99
100impl PerWorkerHealthStatus {
101    fn merge_update(
102        &mut self,
103        worker: usize,
104        namespace: StatusNamespace,
105        update: HealthStatusUpdate,
106        only_greater: bool,
107    ) {
108        let errors = &mut self.errors_by_worker[worker];
109        match errors.entry(namespace) {
110            Entry::Vacant(v) => {
111                v.insert(update);
112            }
113            Entry::Occupied(mut o) => {
114                if !only_greater || o.get() < &update {
115                    o.insert(update);
116                }
117            }
118        }
119    }
120
121    fn decide_status(&self) -> OverallStatus {
122        let mut output_status = OverallStatus::Starting;
123        let mut namespaced_errors: BTreeMap<StatusNamespace, String> = BTreeMap::new();
124        let mut hints: BTreeSet<String> = BTreeSet::new();
125
126        for status in self.errors_by_worker.iter() {
127            for (ns, ns_status) in status.iter() {
128                match ns_status {
129                    // HealthStatusUpdate::Ceased is currently unused, so just
130                    // treat it as if it were a normal error.
131                    //
132                    // TODO: redesign ceased status database-issues#7687
133                    HealthStatusUpdate::Ceased { error } => {
134                        if Some(error) > namespaced_errors.get(ns).as_deref() {
135                            namespaced_errors.insert(*ns, error.to_string());
136                        }
137                    }
138                    HealthStatusUpdate::Stalled { error, hint, .. } => {
139                        if Some(error) > namespaced_errors.get(ns).as_deref() {
140                            namespaced_errors.insert(*ns, error.to_string());
141                        }
142
143                        if let Some(hint) = hint {
144                            hints.insert(hint.to_string());
145                        }
146                    }
147                    HealthStatusUpdate::Running => {
148                        if !ns.is_sidechannel() {
149                            output_status = OverallStatus::Running;
150                        }
151                    }
152                }
153            }
154        }
155
156        if !namespaced_errors.is_empty() {
157            // Pick the most important error.
158            let (ns, err) = namespaced_errors.last_key_value().unwrap();
159            output_status = OverallStatus::Stalled {
160                error: format!("{}: {}", ns, err),
161                hints,
162                namespaced_errors,
163            }
164        }
165
166        output_status
167    }
168}
169
170#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
171pub enum OverallStatus {
172    Starting,
173    Running,
174    Stalled {
175        error: String,
176        hints: BTreeSet<String>,
177        namespaced_errors: BTreeMap<StatusNamespace, String>,
178    },
179    Ceased {
180        error: String,
181    },
182}
183
184impl OverallStatus {
185    /// The user-readable error string, if there is one.
186    pub(crate) fn error(&self) -> Option<&str> {
187        match self {
188            OverallStatus::Starting | OverallStatus::Running => None,
189            OverallStatus::Stalled { error, .. } | OverallStatus::Ceased { error, .. } => {
190                Some(error)
191            }
192        }
193    }
194
195    /// A set of namespaced errors, if there are any.
196    pub(crate) fn errors(&self) -> Option<&BTreeMap<StatusNamespace, String>> {
197        match self {
198            OverallStatus::Starting | OverallStatus::Running | OverallStatus::Ceased { .. } => None,
199            OverallStatus::Stalled {
200                namespaced_errors, ..
201            } => Some(namespaced_errors),
202        }
203    }
204
205    /// A set of hints, if there are any.
206    pub(crate) fn hints(&self) -> BTreeSet<String> {
207        match self {
208            OverallStatus::Starting | OverallStatus::Running | OverallStatus::Ceased { .. } => {
209                BTreeSet::new()
210            }
211            OverallStatus::Stalled { hints, .. } => hints.clone(),
212        }
213    }
214}
215
216impl<'a> From<&'a OverallStatus> for Status {
217    fn from(val: &'a OverallStatus) -> Self {
218        match val {
219            OverallStatus::Starting => Status::Starting,
220            OverallStatus::Running => Status::Running,
221            OverallStatus::Stalled { .. } => Status::Stalled,
222            OverallStatus::Ceased { .. } => Status::Ceased,
223        }
224    }
225}
226
227#[derive(Debug)]
228struct HealthState {
229    healths: PerWorkerHealthStatus,
230    last_reported_status: Option<OverallStatus>,
231    halt_with: Option<(StatusNamespace, HealthStatusUpdate)>,
232}
233
234impl HealthState {
235    fn new(worker_count: usize) -> HealthState {
236        HealthState {
237            healths: PerWorkerHealthStatus {
238                errors_by_worker: vec![Default::default(); worker_count],
239            },
240            last_reported_status: None,
241            halt_with: None,
242        }
243    }
244}
245
246/// A trait that lets a user configure the `health_operator` with custom
247/// behavior. This is mostly useful for testing, and the [`DefaultWriter`]
248/// should be the correct implementation for everyone.
249pub trait HealthOperator {
250    /// Record a new status.
251    fn record_new_status(
252        &self,
253        collection_id: GlobalId,
254        ts: DateTime<Utc>,
255        new_status: Status,
256        new_error: Option<&str>,
257        hints: &BTreeSet<String>,
258        namespaced_errors: &BTreeMap<StatusNamespace, String>,
259        // TODO(guswynn): not urgent:
260        // Ideally this would be entirely included in the `DefaultWriter`, but that
261        // requires a fairly heavy change to the `health_operator`, which hardcodes
262        // some use of persist. For now we just leave it and ignore it in tests.
263        write_namespaced_map: bool,
264    );
265    fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>);
266
267    /// Optionally override the chosen worker index. Default is semi-random.
268    /// Only useful for tests.
269    fn chosen_worker(&self) -> Option<usize> {
270        None
271    }
272}
273
274/// A default `HealthOperator` for use in normal cases.
275pub struct DefaultWriter {
276    pub command_tx: InternalCommandSender,
277    pub updates: Rc<RefCell<Vec<StatusUpdate>>>,
278}
279
280impl HealthOperator for DefaultWriter {
281    fn record_new_status(
282        &self,
283        collection_id: GlobalId,
284        ts: DateTime<Utc>,
285        status: Status,
286        new_error: Option<&str>,
287        hints: &BTreeSet<String>,
288        namespaced_errors: &BTreeMap<StatusNamespace, String>,
289        write_namespaced_map: bool,
290    ) {
291        self.updates.borrow_mut().push(StatusUpdate {
292            id: collection_id,
293            timestamp: ts,
294            status,
295            error: new_error.map(|e| e.to_string()),
296            hints: hints.clone(),
297            namespaced_errors: if write_namespaced_map {
298                namespaced_errors
299                    .iter()
300                    .map(|(ns, val)| (ns.to_string(), val.clone()))
301                    .collect()
302            } else {
303                BTreeMap::new()
304            },
305            replica_id: None,
306        });
307    }
308
309    fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>) {
310        self.command_tx
311            .send(InternalStorageCommand::SuspendAndRestart {
312                // Suspend and restart is expected to operate on the primary object and
313                // not any of the sub-objects
314                id,
315                reason: format!("{:?}", error),
316            });
317    }
318}
319
320/// A health message consumed by the `health_operator`.
321#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
322pub struct HealthStatusMessage {
323    /// The object that this status message is about. When None, it refers to the entire ingestion
324    /// as a whole. When Some, it refers to a specific subsource.
325    pub id: Option<GlobalId>,
326    /// The namespace of the health update.
327    pub namespace: StatusNamespace,
328    /// The update itself.
329    pub update: HealthStatusUpdate,
330}
331
332/// Writes updates that come across `health_stream` to the collection's status shards, as identified
333/// by their `CollectionMetadata`.
334///
335/// Only one worker will be active and write to the status shard.
336///
337/// The `OutputIndex` values that come across `health_stream` must be a strict subset of those in
338/// `configs`'s keys.
339pub(crate) fn health_operator<'scope, T: Timestamp, P>(
340    scope: Scope<'scope, T>,
341    now: NowFn,
342    // A set of id's that should be marked as `HealthStatusUpdate::starting()` during startup.
343    mark_starting: BTreeSet<GlobalId>,
344    // An id that is allowed to halt the dataflow. Others are ignored, and panic during debug
345    // mode.
346    halting_id: GlobalId,
347    // A description of the object type we are writing status updates about. Used in log lines.
348    object_type: &'static str,
349    // An indexed stream of health updates. Indexes are configured in `configs`.
350    health_stream: StreamVec<'scope, T, HealthStatusMessage>,
351    // An impl of `HealthOperator` that configures the output behavior of this operator.
352    health_operator_impl: P,
353    // Whether or not we should actually write namespaced errors in the `details` column.
354    write_namespaced_map: bool,
355    // How long to wait before initiating a `SuspendAndRestart` command, to
356    // prevent hot restart loops.
357    suspend_and_restart_delay: Duration,
358) -> PressOnDropButton
359where
360    P: HealthOperator + 'static,
361{
362    // Derived config options
363    let healthcheck_worker_id = scope.index();
364    let worker_count = scope.peers();
365
366    // Inject the originating worker id to each item before exchanging to the chosen worker
367    let health_stream = health_stream.map(move |status| (healthcheck_worker_id, status));
368
369    let chosen_worker_id = if let Some(index) = health_operator_impl.chosen_worker() {
370        index
371    } else {
372        // We'll route all the work to a single arbitrary worker;
373        // there's not much to do, and we need a global view.
374        usize::cast_from(mark_starting.iter().next().hashed()) % worker_count
375    };
376
377    let is_active_worker = chosen_worker_id == healthcheck_worker_id;
378
379    let operator_name = format!("healthcheck({})", healthcheck_worker_id);
380    let mut health_op = AsyncOperatorBuilder::new(operator_name, scope.clone());
381
382    let mut input = health_op.new_disconnected_input(
383        health_stream,
384        Exchange::new(move |_| u64::cast_from(chosen_worker_id)),
385    );
386
387    let button = health_op.build(move |mut _capabilities| async move {
388        let mut health_states: BTreeMap<_, _> = mark_starting
389            .iter()
390            .copied()
391            .chain([halting_id])
392            .map(|id| (id, HealthState::new(worker_count)))
393            .collect();
394
395        // Write the initial starting state to the status shard for all managed objects
396        if is_active_worker {
397            for (id, state) in health_states.iter_mut() {
398                let status = OverallStatus::Starting;
399                let timestamp = mz_ore::now::to_datetime(now());
400                health_operator_impl.record_new_status(
401                    *id,
402                    timestamp,
403                    (&status).into(),
404                    status.error(),
405                    &status.hints(),
406                    status.errors().unwrap_or(&BTreeMap::new()),
407                    write_namespaced_map,
408                );
409
410                state.last_reported_status = Some(status);
411            }
412        }
413
414        let mut outputs_seen = BTreeMap::<GlobalId, BTreeSet<_>>::new();
415        while let Some(event) = input.next().await {
416            if let AsyncEvent::Data(_cap, rows) = event {
417                for (worker_id, message) in rows {
418                    let HealthStatusMessage {
419                        id,
420                        namespace: ns,
421                        update: health_event,
422                    } = message;
423                    let id = id.unwrap_or(halting_id);
424                    let HealthState {
425                        healths, halt_with, ..
426                    } = match health_states.get_mut(&id) {
427                        Some(health) => health,
428                        // This is a health status update for a sub-object_type that we did not request to
429                        // be generated, which means it doesn't have a GlobalId and should not be
430                        // propagated to the shard.
431                        None => continue,
432                    };
433
434                    // Its important to track `new_round` per-namespace, so namespaces are reasoned
435                    // about in `merge_update` independently.
436                    let new_round = outputs_seen
437                        .entry(id)
438                        .or_insert_with(BTreeSet::new)
439                        .insert(ns.clone());
440
441                    if !is_active_worker {
442                        error!(
443                            "Health messages for {object_type} {id} passed to \
444                              an unexpected worker id: {healthcheck_worker_id}"
445                        )
446                    }
447
448                    if health_event.should_halt() {
449                        *halt_with = Some((ns.clone(), health_event.clone()));
450                    }
451
452                    healths.merge_update(worker_id, ns, health_event, !new_round);
453                }
454
455                let mut halt_with_outer = None;
456
457                while let Some((id, _)) = outputs_seen.pop_first() {
458                    let HealthState {
459                        healths,
460                        last_reported_status,
461                        halt_with,
462                    } = health_states.get_mut(&id).expect("known to exist");
463
464                    let new_status = healths.decide_status();
465
466                    if Some(&new_status) != last_reported_status.as_ref() {
467                        info!(
468                            "Health transition for {object_type} {id}: \
469                                  {last_reported_status:?} -> {:?}",
470                            Some(&new_status)
471                        );
472
473                        let timestamp = mz_ore::now::to_datetime(now());
474                        health_operator_impl.record_new_status(
475                            id,
476                            timestamp,
477                            (&new_status).into(),
478                            new_status.error(),
479                            &new_status.hints(),
480                            new_status.errors().unwrap_or(&BTreeMap::new()),
481                            write_namespaced_map,
482                        );
483
484                        *last_reported_status = Some(new_status.clone());
485                    }
486
487                    // Set halt with if None.
488                    if halt_with_outer.is_none() && halt_with.is_some() {
489                        halt_with_outer = Some((id, halt_with.clone()));
490                    }
491                }
492
493                // TODO(aljoscha): Instead of threading through the
494                // `should_halt` bit, we can give an internal command sender
495                // directly to the places where `should_halt = true` originates.
496                // We should definitely do that, but this is okay for a PoC.
497                if let Some((id, halt_with)) = halt_with_outer {
498                    mz_ore::soft_assert_or_log!(
499                        id == halting_id,
500                        "sub{object_type}s should not produce \
501                        halting errors, however {:?} halted while primary \
502                                            {object_type} is {:?}",
503                        id,
504                        halting_id
505                    );
506
507                    info!(
508                        "Broadcasting suspend-and-restart \
509                        command because of {:?} after {:?} delay",
510                        halt_with, suspend_and_restart_delay
511                    );
512                    tokio::time::sleep(suspend_and_restart_delay).await;
513                    health_operator_impl.send_halt(id, halt_with);
514                }
515            }
516        }
517    });
518
519    button.press_on_drop()
520}
521
522use serde::{Deserialize, Serialize};
523
524/// NB: we derive Ord here, so the enum order matters. Generally, statuses later in the list
525/// take precedence over earlier ones: so if one worker is stalled, we'll consider the entire
526/// source to be stalled.
527#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
528pub enum HealthStatusUpdate {
529    Running,
530    Stalled {
531        error: String,
532        hint: Option<String>,
533        should_halt: bool,
534    },
535    Ceased {
536        error: String,
537    },
538}
539
540impl HealthStatusUpdate {
541    /// Generates a running [`HealthStatusUpdate`].
542    pub(crate) fn running() -> Self {
543        HealthStatusUpdate::Running
544    }
545
546    /// Generates a non-halting [`HealthStatusUpdate`] with `update`.
547    pub(crate) fn stalled(error: String, hint: Option<String>) -> Self {
548        HealthStatusUpdate::Stalled {
549            error,
550            hint,
551            should_halt: false,
552        }
553    }
554
555    /// Generates a halting [`HealthStatusUpdate`] with `update`.
556    pub(crate) fn halting(error: String, hint: Option<String>) -> Self {
557        HealthStatusUpdate::Stalled {
558            error,
559            hint,
560            should_halt: true,
561        }
562    }
563
564    // TODO: redesign ceased status database-issues#7687
565    // Generates a ceasing [`HealthStatusUpdate`] with `update`.
566    // pub(crate) fn ceasing(error: String) -> Self {
567    //     HealthStatusUpdate::Ceased { error }
568    // }
569
570    /// Whether or not we should halt the dataflow instances and restart it.
571    pub(crate) fn should_halt(&self) -> bool {
572        match self {
573            HealthStatusUpdate::Running |
574            // HealthStatusUpdate::Ceased should never halt because it can occur
575            // at the subsource level and should not cause the entire dataflow
576            // to halt. Instead, the dataflow itself should handle shutting
577            // itself down if need be.
578            HealthStatusUpdate::Ceased { .. } => false,
579            HealthStatusUpdate::Stalled { should_halt, .. } => *should_halt,
580        }
581    }
582}
583
584#[cfg(test)]
585mod tests {
586    use super::*;
587    use itertools::Itertools;
588
589    // Actual timely tests for `health_operator`.
590
591    #[mz_ore::test]
592    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
593    fn test_health_operator_basic() {
594        use Step::*;
595
596        // Test 2 inputs across 2 workers.
597        health_operator_runner(
598            2,
599            2,
600            true,
601            vec![
602                AssertStatus(vec![
603                    // Assert both inputs started.
604                    StatusToAssert {
605                        collection_index: 0,
606                        status: Status::Starting,
607                        ..Default::default()
608                    },
609                    StatusToAssert {
610                        collection_index: 1,
611                        status: Status::Starting,
612                        ..Default::default()
613                    },
614                ]),
615                // Update and assert one is running.
616                Update(TestUpdate {
617                    worker_id: 1,
618                    namespace: StatusNamespace::Generator,
619                    id: None,
620                    update: HealthStatusUpdate::running(),
621                }),
622                AssertStatus(vec![StatusToAssert {
623                    collection_index: 0,
624                    status: Status::Running,
625                    ..Default::default()
626                }]),
627                // Assert the other can be stalled by 1 worker.
628                //
629                // TODO(guswynn): ideally we could push these updates
630                // at the same time, but because they are coming from separately
631                // workers, they could end up in different rounds, causing flakes.
632                // For now, we just do this.
633                Update(TestUpdate {
634                    worker_id: 1,
635                    namespace: StatusNamespace::Generator,
636                    id: Some(GlobalId::User(1)),
637                    update: HealthStatusUpdate::running(),
638                }),
639                AssertStatus(vec![StatusToAssert {
640                    collection_index: 1,
641                    status: Status::Running,
642                    ..Default::default()
643                }]),
644                Update(TestUpdate {
645                    worker_id: 0,
646                    namespace: StatusNamespace::Generator,
647                    id: Some(GlobalId::User(1)),
648                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
649                }),
650                AssertStatus(vec![StatusToAssert {
651                    collection_index: 1,
652                    status: Status::Stalled,
653                    error: Some("generator: uhoh".to_string()),
654                    errors: Some("generator: uhoh".to_string()),
655                    ..Default::default()
656                }]),
657                // And that it can recover.
658                Update(TestUpdate {
659                    worker_id: 0,
660                    namespace: StatusNamespace::Generator,
661                    id: Some(GlobalId::User(1)),
662                    update: HealthStatusUpdate::running(),
663                }),
664                AssertStatus(vec![StatusToAssert {
665                    collection_index: 1,
666                    status: Status::Running,
667                    ..Default::default()
668                }]),
669            ],
670        );
671    }
672
673    #[mz_ore::test]
674    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
675    fn test_health_operator_write_namespaced_map() {
676        use Step::*;
677
678        // Test 2 inputs across 2 workers.
679        health_operator_runner(
680            2,
681            2,
682            // testing this
683            false,
684            vec![
685                AssertStatus(vec![
686                    // Assert both inputs started.
687                    StatusToAssert {
688                        collection_index: 0,
689                        status: Status::Starting,
690                        ..Default::default()
691                    },
692                    StatusToAssert {
693                        collection_index: 1,
694                        status: Status::Starting,
695                        ..Default::default()
696                    },
697                ]),
698                Update(TestUpdate {
699                    worker_id: 0,
700                    namespace: StatusNamespace::Generator,
701                    id: Some(GlobalId::User(1)),
702                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
703                }),
704                AssertStatus(vec![StatusToAssert {
705                    collection_index: 1,
706                    status: Status::Stalled,
707                    error: Some("generator: uhoh".to_string()),
708                    errors: None,
709                    ..Default::default()
710                }]),
711            ],
712        )
713    }
714
715    #[mz_ore::test]
716    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
717    fn test_health_operator_namespaces() {
718        use Step::*;
719
720        // Test 2 inputs across 2 workers.
721        health_operator_runner(
722            2,
723            1,
724            true,
725            vec![
726                AssertStatus(vec![
727                    // Assert both inputs started.
728                    StatusToAssert {
729                        collection_index: 0,
730                        status: Status::Starting,
731                        ..Default::default()
732                    },
733                ]),
734                // Assert that we merge namespaced errors correctly.
735                //
736                // Note that these all happen on the same worker id.
737                Update(TestUpdate {
738                    worker_id: 0,
739                    namespace: StatusNamespace::Generator,
740                    id: None,
741                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
742                }),
743                AssertStatus(vec![StatusToAssert {
744                    collection_index: 0,
745                    status: Status::Stalled,
746                    error: Some("generator: uhoh".to_string()),
747                    errors: Some("generator: uhoh".to_string()),
748                    ..Default::default()
749                }]),
750                Update(TestUpdate {
751                    worker_id: 0,
752                    namespace: StatusNamespace::Kafka,
753                    id: None,
754                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
755                }),
756                AssertStatus(vec![StatusToAssert {
757                    collection_index: 0,
758                    status: Status::Stalled,
759                    error: Some("kafka: uhoh".to_string()),
760                    errors: Some("generator: uhoh, kafka: uhoh".to_string()),
761                    ..Default::default()
762                }]),
763                // And that it can recover.
764                Update(TestUpdate {
765                    worker_id: 0,
766                    namespace: StatusNamespace::Kafka,
767                    id: None,
768                    update: HealthStatusUpdate::running(),
769                }),
770                AssertStatus(vec![StatusToAssert {
771                    collection_index: 0,
772                    status: Status::Stalled,
773                    error: Some("generator: uhoh".to_string()),
774                    errors: Some("generator: uhoh".to_string()),
775                    ..Default::default()
776                }]),
777                Update(TestUpdate {
778                    worker_id: 0,
779                    namespace: StatusNamespace::Generator,
780                    id: None,
781                    update: HealthStatusUpdate::running(),
782                }),
783                AssertStatus(vec![StatusToAssert {
784                    collection_index: 0,
785                    status: Status::Running,
786                    ..Default::default()
787                }]),
788            ],
789        );
790    }
791
792    #[mz_ore::test]
793    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
794    fn test_health_operator_namespace_side_channel() {
795        use Step::*;
796
797        health_operator_runner(
798            2,
799            1,
800            true,
801            vec![
802                AssertStatus(vec![
803                    // Assert both inputs started.
804                    StatusToAssert {
805                        collection_index: 0,
806                        status: Status::Starting,
807                        ..Default::default()
808                    },
809                ]),
810                // Assert that sidechannel namespaces don't downgrade the status
811                //
812                // Note that these all happen on the same worker id.
813                Update(TestUpdate {
814                    worker_id: 0,
815                    namespace: StatusNamespace::Ssh,
816                    id: None,
817                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
818                }),
819                AssertStatus(vec![StatusToAssert {
820                    collection_index: 0,
821                    status: Status::Stalled,
822                    error: Some("ssh: uhoh".to_string()),
823                    errors: Some("ssh: uhoh".to_string()),
824                    ..Default::default()
825                }]),
826                Update(TestUpdate {
827                    worker_id: 0,
828                    namespace: StatusNamespace::Ssh,
829                    id: None,
830                    update: HealthStatusUpdate::stalled("uhoh2".to_string(), None),
831                }),
832                AssertStatus(vec![StatusToAssert {
833                    collection_index: 0,
834                    status: Status::Stalled,
835                    error: Some("ssh: uhoh2".to_string()),
836                    errors: Some("ssh: uhoh2".to_string()),
837                    ..Default::default()
838                }]),
839                Update(TestUpdate {
840                    worker_id: 0,
841                    namespace: StatusNamespace::Ssh,
842                    id: None,
843                    update: HealthStatusUpdate::running(),
844                }),
845                // We haven't starting running yet, as a `Default` namespace hasn't told us.
846                AssertStatus(vec![StatusToAssert {
847                    collection_index: 0,
848                    status: Status::Starting,
849                    ..Default::default()
850                }]),
851                Update(TestUpdate {
852                    worker_id: 0,
853                    namespace: StatusNamespace::Generator,
854                    id: None,
855                    update: HealthStatusUpdate::running(),
856                }),
857                AssertStatus(vec![StatusToAssert {
858                    collection_index: 0,
859                    status: Status::Running,
860                    ..Default::default()
861                }]),
862            ],
863        );
864    }
865
866    #[mz_ore::test]
867    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
868    fn test_health_operator_hints() {
869        use Step::*;
870
871        health_operator_runner(
872            2,
873            1,
874            true,
875            vec![
876                AssertStatus(vec![
877                    // Assert both inputs started.
878                    StatusToAssert {
879                        collection_index: 0,
880                        status: Status::Starting,
881                        ..Default::default()
882                    },
883                ]),
884                // Note that these all happen across worker ids.
885                Update(TestUpdate {
886                    worker_id: 0,
887                    namespace: StatusNamespace::Generator,
888                    id: None,
889                    update: HealthStatusUpdate::stalled(
890                        "uhoh".to_string(),
891                        Some("hint1".to_string()),
892                    ),
893                }),
894                AssertStatus(vec![StatusToAssert {
895                    collection_index: 0,
896                    status: Status::Stalled,
897                    error: Some("generator: uhoh".to_string()),
898                    errors: Some("generator: uhoh".to_string()),
899                    hint: Some("hint1".to_string()),
900                }]),
901                Update(TestUpdate {
902                    worker_id: 1,
903                    namespace: StatusNamespace::Generator,
904                    id: None,
905                    update: HealthStatusUpdate::stalled(
906                        "uhoh2".to_string(),
907                        Some("hint2".to_string()),
908                    ),
909                }),
910                AssertStatus(vec![StatusToAssert {
911                    collection_index: 0,
912                    status: Status::Stalled,
913                    // Note the error sorts later so we just use that.
914                    error: Some("generator: uhoh2".to_string()),
915                    errors: Some("generator: uhoh2".to_string()),
916                    hint: Some("hint1, hint2".to_string()),
917                }]),
918                // Update one of the hints
919                Update(TestUpdate {
920                    worker_id: 1,
921                    namespace: StatusNamespace::Generator,
922                    id: None,
923                    update: HealthStatusUpdate::stalled(
924                        "uhoh2".to_string(),
925                        Some("hint3".to_string()),
926                    ),
927                }),
928                AssertStatus(vec![StatusToAssert {
929                    collection_index: 0,
930                    status: Status::Stalled,
931                    // Note the error sorts later so we just use that.
932                    error: Some("generator: uhoh2".to_string()),
933                    errors: Some("generator: uhoh2".to_string()),
934                    hint: Some("hint1, hint3".to_string()),
935                }]),
936                // Assert recovery.
937                Update(TestUpdate {
938                    worker_id: 0,
939                    namespace: StatusNamespace::Generator,
940                    id: None,
941                    update: HealthStatusUpdate::running(),
942                }),
943                AssertStatus(vec![StatusToAssert {
944                    collection_index: 0,
945                    status: Status::Stalled,
946                    // Note the error sorts later so we just use that.
947                    error: Some("generator: uhoh2".to_string()),
948                    errors: Some("generator: uhoh2".to_string()),
949                    hint: Some("hint3".to_string()),
950                }]),
951                Update(TestUpdate {
952                    worker_id: 1,
953                    namespace: StatusNamespace::Generator,
954                    id: None,
955                    update: HealthStatusUpdate::running(),
956                }),
957                AssertStatus(vec![StatusToAssert {
958                    collection_index: 0,
959                    status: Status::Running,
960                    ..Default::default()
961                }]),
962            ],
963        );
964    }
965
966    // The below is ALL test infrastructure for the above
967
968    use mz_ore::assert_err;
969    use timely::container::CapacityContainerBuilder;
970    use timely::dataflow::Scope;
971    use timely::dataflow::operators::Enter;
972    use timely::dataflow::operators::exchange::Exchange;
973    use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
974
975    /// A status to assert.
976    #[derive(Debug, Clone, PartialEq, Eq)]
977    struct StatusToAssert {
978        collection_index: usize,
979        status: Status,
980        error: Option<String>,
981        errors: Option<String>,
982        hint: Option<String>,
983    }
984
985    impl Default for StatusToAssert {
986        fn default() -> Self {
987            StatusToAssert {
988                collection_index: Default::default(),
989                status: Status::Running,
990                error: Default::default(),
991                errors: Default::default(),
992                hint: Default::default(),
993            }
994        }
995    }
996
997    /// An update to push into the operator.
998    /// Can come from any worker, and from any input.
999    #[derive(Debug, Clone)]
1000    struct TestUpdate {
1001        worker_id: u64,
1002        namespace: StatusNamespace,
1003        id: Option<GlobalId>,
1004        update: HealthStatusUpdate,
1005    }
1006
1007    #[derive(Debug, Clone)]
1008    enum Step {
1009        /// Insert a new health update.
1010        Update(TestUpdate),
1011        /// Assert a set of outputs. Note that these should
1012        /// have unique `collection_index`'s
1013        AssertStatus(Vec<StatusToAssert>),
1014    }
1015
1016    struct TestWriter {
1017        sender: UnboundedSender<StatusToAssert>,
1018        input_mapping: BTreeMap<GlobalId, usize>,
1019    }
1020
1021    impl HealthOperator for TestWriter {
1022        fn record_new_status(
1023            &self,
1024            collection_id: GlobalId,
1025            _ts: DateTime<Utc>,
1026            status: Status,
1027            new_error: Option<&str>,
1028            hints: &BTreeSet<String>,
1029            namespaced_errors: &BTreeMap<StatusNamespace, String>,
1030            write_namespaced_map: bool,
1031        ) {
1032            let _ = self.sender.send(StatusToAssert {
1033                collection_index: *self.input_mapping.get(&collection_id).unwrap(),
1034                status,
1035                error: new_error.map(str::to_string),
1036                errors: if !namespaced_errors.is_empty() && write_namespaced_map {
1037                    Some(
1038                        namespaced_errors
1039                            .iter()
1040                            .map(|(ns, err)| format!("{}: {}", ns, err))
1041                            .join(", "),
1042                    )
1043                } else {
1044                    None
1045                },
1046                hint: if !hints.is_empty() {
1047                    Some(hints.iter().join(", "))
1048                } else {
1049                    None
1050                },
1051            });
1052        }
1053
1054        fn send_halt(&self, _id: GlobalId, _error: Option<(StatusNamespace, HealthStatusUpdate)>) {
1055            // Not yet unit-tested
1056            unimplemented!()
1057        }
1058
1059        fn chosen_worker(&self) -> Option<usize> {
1060            // We input and assert outputs on the first worker.
1061            Some(0)
1062        }
1063    }
1064
1065    /// Setup a `health_operator` with a set number of workers and inputs, and the
1066    /// steps on the first worker.
1067    fn health_operator_runner(
1068        workers: usize,
1069        inputs: usize,
1070        write_namespaced_map: bool,
1071        steps: Vec<Step>,
1072    ) {
1073        let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
1074        let tokio_handle = tokio_runtime.handle().clone();
1075
1076        let inputs: BTreeMap<GlobalId, usize> = (0..inputs)
1077            .map(|index| (GlobalId::User(u64::cast_from(index)), index))
1078            .collect();
1079
1080        timely::execute::execute(
1081            timely::execute::Config {
1082                communication: timely::CommunicationConfig::Process(workers),
1083                worker: Default::default(),
1084            },
1085            move |worker| {
1086                let steps = steps.clone();
1087                let inputs = inputs.clone();
1088
1089                let _tokio_guard = tokio_handle.enter();
1090                let (in_tx, in_rx) = unbounded_channel();
1091                let (out_tx, mut out_rx) = unbounded_channel();
1092
1093                worker.dataflow::<(), _, _>(|root_scope| {
1094                    root_scope
1095                        .clone()
1096                        .scoped::<mz_repr::Timestamp, _, _>("gus", |scope| {
1097                            let input = producer(root_scope.clone(), in_rx).enter(scope);
1098                            Box::leak(Box::new(health_operator(
1099                                scope,
1100                                mz_ore::now::SYSTEM_TIME.clone(),
1101                                inputs.keys().copied().collect(),
1102                                *inputs.first_key_value().unwrap().0,
1103                                "source_test",
1104                                input,
1105                                TestWriter {
1106                                    sender: out_tx,
1107                                    input_mapping: inputs,
1108                                },
1109                                write_namespaced_map,
1110                                Duration::from_secs(5),
1111                            )));
1112                        });
1113                });
1114
1115                // We arbitrarily do all the testing on the first worker.
1116                if worker.index() == 0 {
1117                    use Step::*;
1118                    for step in steps {
1119                        match step {
1120                            Update(update) => {
1121                                let _ = in_tx.send(update);
1122                            }
1123                            AssertStatus(mut statuses) => loop {
1124                                match out_rx.try_recv() {
1125                                    Err(_) => {
1126                                        worker.step();
1127                                        // This makes testing easier.
1128                                        std::thread::sleep(std::time::Duration::from_millis(50));
1129                                    }
1130                                    Ok(update) => {
1131                                        let pos = statuses
1132                                            .iter()
1133                                            .position(|s| {
1134                                                s.collection_index == update.collection_index
1135                                            })
1136                                            .unwrap();
1137
1138                                        let status_to_assert = &statuses[pos];
1139                                        assert_eq!(&update, status_to_assert);
1140
1141                                        statuses.remove(pos);
1142                                        if statuses.is_empty() {
1143                                            break;
1144                                        }
1145                                    }
1146                                }
1147                            },
1148                        }
1149                    }
1150
1151                    // Assert that nothing is left in the channel.
1152                    assert_err!(out_rx.try_recv());
1153                }
1154            },
1155        )
1156        .unwrap();
1157    }
1158
1159    /// Produces (input_index, HealthStatusUpdate)'s based on the input channel.
1160    ///
1161    /// Only the first worker is used, all others immediately drop their capabilities and channels.
1162    /// After the channel is empty on the first worker, then the frontier will go to [].
1163    /// Also ensures that updates are routed to the correct worker based on the `TestUpdate`
1164    /// using an exchange.
1165    fn producer<'scope>(
1166        scope: Scope<'scope, ()>,
1167        mut input: UnboundedReceiver<TestUpdate>,
1168    ) -> StreamVec<'scope, (), HealthStatusMessage> {
1169        let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope.clone());
1170        let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<_>>>();
1171
1172        let index = scope.index();
1173        iterator.build(|mut caps| async move {
1174            // We input and assert outputs on the first worker.
1175            if index != 0 {
1176                return;
1177            }
1178            let mut capability = Some(caps.pop().unwrap());
1179            while let Some(element) = input.recv().await {
1180                output_handle.give(
1181                    capability.as_ref().unwrap(),
1182                    (
1183                        element.worker_id,
1184                        element.id,
1185                        element.namespace,
1186                        element.update,
1187                    ),
1188                );
1189            }
1190
1191            capability.take();
1192        });
1193
1194        let output = output.exchange(|d| d.0).map(|d| HealthStatusMessage {
1195            id: d.1,
1196            namespace: d.2,
1197            update: d.3,
1198        });
1199
1200        output
1201    }
1202}