mz_storage/
healthcheck.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Healthcheck common
11
12use std::cell::RefCell;
13use std::collections::btree_map::Entry;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::fmt::Debug;
17use std::rc::Rc;
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use differential_dataflow::Hashable;
22use futures::StreamExt;
23use mz_ore::cast::CastFrom;
24use mz_ore::now::NowFn;
25use mz_repr::GlobalId;
26use mz_storage_client::client::{Status, StatusUpdate};
27use mz_timely_util::builder_async::{
28    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
29};
30use timely::dataflow::channels::pact::Exchange;
31use timely::dataflow::operators::Map;
32use timely::dataflow::{Scope, Stream};
33use tracing::{error, info};
34
35use crate::internal_control::{InternalCommandSender, InternalStorageCommand};
36
37/// The namespace of the update. The `Ord` impl matter here, later variants are
38/// displayed over earlier ones.
39///
40/// Some namespaces (referred to as "sidechannels") can come from any worker_id,
41/// and `Running` statuses from them do not mark the entire object as running.
42///
43/// Ensure you update `is_sidechannel` when adding variants.
44#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
45pub enum StatusNamespace {
46    /// A normal status namespaces. Any `Running` status from any worker will mark the object
47    /// `Running`.
48    Generator,
49    Kafka,
50    Postgres,
51    MySql,
52    SqlServer,
53    Ssh,
54    Upsert,
55    Decode,
56    Internal,
57}
58
59impl StatusNamespace {
60    fn is_sidechannel(&self) -> bool {
61        matches!(self, StatusNamespace::Ssh)
62    }
63}
64
65impl fmt::Display for StatusNamespace {
66    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67        use StatusNamespace::*;
68        match self {
69            Generator => write!(f, "generator"),
70            Kafka => write!(f, "kafka"),
71            Postgres => write!(f, "postgres"),
72            MySql => write!(f, "mysql"),
73            SqlServer => write!(f, "sql-server"),
74            Ssh => write!(f, "ssh"),
75            Upsert => write!(f, "upsert"),
76            Decode => write!(f, "decode"),
77            Internal => write!(f, "internal"),
78        }
79    }
80}
81
82#[derive(Debug)]
83struct PerWorkerHealthStatus {
84    pub(crate) errors_by_worker: Vec<BTreeMap<StatusNamespace, HealthStatusUpdate>>,
85}
86
87impl PerWorkerHealthStatus {
88    fn merge_update(
89        &mut self,
90        worker: usize,
91        namespace: StatusNamespace,
92        update: HealthStatusUpdate,
93        only_greater: bool,
94    ) {
95        let errors = &mut self.errors_by_worker[worker];
96        match errors.entry(namespace) {
97            Entry::Vacant(v) => {
98                v.insert(update);
99            }
100            Entry::Occupied(mut o) => {
101                if !only_greater || o.get() < &update {
102                    o.insert(update);
103                }
104            }
105        }
106    }
107
108    fn decide_status(&self) -> OverallStatus {
109        let mut output_status = OverallStatus::Starting;
110        let mut namespaced_errors: BTreeMap<StatusNamespace, String> = BTreeMap::new();
111        let mut hints: BTreeSet<String> = BTreeSet::new();
112
113        for status in self.errors_by_worker.iter() {
114            for (ns, ns_status) in status.iter() {
115                match ns_status {
116                    // HealthStatusUpdate::Ceased is currently unused, so just
117                    // treat it as if it were a normal error.
118                    //
119                    // TODO: redesign ceased status database-issues#7687
120                    HealthStatusUpdate::Ceased { error } => {
121                        if Some(error) > namespaced_errors.get(ns).as_deref() {
122                            namespaced_errors.insert(*ns, error.to_string());
123                        }
124                    }
125                    HealthStatusUpdate::Stalled { error, hint, .. } => {
126                        if Some(error) > namespaced_errors.get(ns).as_deref() {
127                            namespaced_errors.insert(*ns, error.to_string());
128                        }
129
130                        if let Some(hint) = hint {
131                            hints.insert(hint.to_string());
132                        }
133                    }
134                    HealthStatusUpdate::Running => {
135                        if !ns.is_sidechannel() {
136                            output_status = OverallStatus::Running;
137                        }
138                    }
139                }
140            }
141        }
142
143        if !namespaced_errors.is_empty() {
144            // Pick the most important error.
145            let (ns, err) = namespaced_errors.last_key_value().unwrap();
146            output_status = OverallStatus::Stalled {
147                error: format!("{}: {}", ns, err),
148                hints,
149                namespaced_errors,
150            }
151        }
152
153        output_status
154    }
155}
156
157#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
158pub enum OverallStatus {
159    Starting,
160    Running,
161    Stalled {
162        error: String,
163        hints: BTreeSet<String>,
164        namespaced_errors: BTreeMap<StatusNamespace, String>,
165    },
166    Ceased {
167        error: String,
168    },
169}
170
171impl OverallStatus {
172    /// The user-readable error string, if there is one.
173    pub(crate) fn error(&self) -> Option<&str> {
174        match self {
175            OverallStatus::Starting | OverallStatus::Running => None,
176            OverallStatus::Stalled { error, .. } | OverallStatus::Ceased { error, .. } => {
177                Some(error)
178            }
179        }
180    }
181
182    /// A set of namespaced errors, if there are any.
183    pub(crate) fn errors(&self) -> Option<&BTreeMap<StatusNamespace, String>> {
184        match self {
185            OverallStatus::Starting | OverallStatus::Running | OverallStatus::Ceased { .. } => None,
186            OverallStatus::Stalled {
187                namespaced_errors, ..
188            } => Some(namespaced_errors),
189        }
190    }
191
192    /// A set of hints, if there are any.
193    pub(crate) fn hints(&self) -> BTreeSet<String> {
194        match self {
195            OverallStatus::Starting | OverallStatus::Running | OverallStatus::Ceased { .. } => {
196                BTreeSet::new()
197            }
198            OverallStatus::Stalled { hints, .. } => hints.clone(),
199        }
200    }
201}
202
203impl<'a> From<&'a OverallStatus> for Status {
204    fn from(val: &'a OverallStatus) -> Self {
205        match val {
206            OverallStatus::Starting => Status::Starting,
207            OverallStatus::Running => Status::Running,
208            OverallStatus::Stalled { .. } => Status::Stalled,
209            OverallStatus::Ceased { .. } => Status::Ceased,
210        }
211    }
212}
213
214#[derive(Debug)]
215struct HealthState {
216    healths: PerWorkerHealthStatus,
217    last_reported_status: Option<OverallStatus>,
218    halt_with: Option<(StatusNamespace, HealthStatusUpdate)>,
219}
220
221impl HealthState {
222    fn new(worker_count: usize) -> HealthState {
223        HealthState {
224            healths: PerWorkerHealthStatus {
225                errors_by_worker: vec![Default::default(); worker_count],
226            },
227            last_reported_status: None,
228            halt_with: None,
229        }
230    }
231}
232
233/// A trait that lets a user configure the `health_operator` with custom
234/// behavior. This is mostly useful for testing, and the [`DefaultWriter`]
235/// should be the correct implementation for everyone.
236pub trait HealthOperator {
237    /// Record a new status.
238    fn record_new_status(
239        &self,
240        collection_id: GlobalId,
241        ts: DateTime<Utc>,
242        new_status: Status,
243        new_error: Option<&str>,
244        hints: &BTreeSet<String>,
245        namespaced_errors: &BTreeMap<StatusNamespace, String>,
246        // TODO(guswynn): not urgent:
247        // Ideally this would be entirely included in the `DefaultWriter`, but that
248        // requires a fairly heavy change to the `health_operator`, which hardcodes
249        // some use of persist. For now we just leave it and ignore it in tests.
250        write_namespaced_map: bool,
251    );
252    fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>);
253
254    /// Optionally override the chosen worker index. Default is semi-random.
255    /// Only useful for tests.
256    fn chosen_worker(&self) -> Option<usize> {
257        None
258    }
259}
260
261/// A default `HealthOperator` for use in normal cases.
262pub struct DefaultWriter {
263    pub command_tx: InternalCommandSender,
264    pub updates: Rc<RefCell<Vec<StatusUpdate>>>,
265}
266
267impl HealthOperator for DefaultWriter {
268    fn record_new_status(
269        &self,
270        collection_id: GlobalId,
271        ts: DateTime<Utc>,
272        status: Status,
273        new_error: Option<&str>,
274        hints: &BTreeSet<String>,
275        namespaced_errors: &BTreeMap<StatusNamespace, String>,
276        write_namespaced_map: bool,
277    ) {
278        self.updates.borrow_mut().push(StatusUpdate {
279            id: collection_id,
280            timestamp: ts,
281            status,
282            error: new_error.map(|e| e.to_string()),
283            hints: hints.clone(),
284            namespaced_errors: if write_namespaced_map {
285                namespaced_errors
286                    .iter()
287                    .map(|(ns, val)| (ns.to_string(), val.clone()))
288                    .collect()
289            } else {
290                BTreeMap::new()
291            },
292            replica_id: None,
293        });
294    }
295
296    fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>) {
297        self.command_tx
298            .send(InternalStorageCommand::SuspendAndRestart {
299                // Suspend and restart is expected to operate on the primary object and
300                // not any of the sub-objects
301                id,
302                reason: format!("{:?}", error),
303            });
304    }
305}
306
307/// A health message consumed by the `health_operator`.
308#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
309pub struct HealthStatusMessage {
310    /// The object that this status message is about. When None, it refers to the entire ingestion
311    /// as a whole. When Some, it refers to a specific subsource.
312    pub id: Option<GlobalId>,
313    /// The namespace of the health update.
314    pub namespace: StatusNamespace,
315    /// The update itself.
316    pub update: HealthStatusUpdate,
317}
318
319/// Writes updates that come across `health_stream` to the collection's status shards, as identified
320/// by their `CollectionMetadata`.
321///
322/// Only one worker will be active and write to the status shard.
323///
324/// The `OutputIndex` values that come across `health_stream` must be a strict subset of those in
325/// `configs`'s keys.
326pub(crate) fn health_operator<G, P>(
327    scope: &G,
328    now: NowFn,
329    // A set of id's that should be marked as `HealthStatusUpdate::starting()` during startup.
330    mark_starting: BTreeSet<GlobalId>,
331    // An id that is allowed to halt the dataflow. Others are ignored, and panic during debug
332    // mode.
333    halting_id: GlobalId,
334    // A description of the object type we are writing status updates about. Used in log lines.
335    object_type: &'static str,
336    // An indexed stream of health updates. Indexes are configured in `configs`.
337    health_stream: &Stream<G, HealthStatusMessage>,
338    // An impl of `HealthOperator` that configures the output behavior of this operator.
339    health_operator_impl: P,
340    // Whether or not we should actually write namespaced errors in the `details` column.
341    write_namespaced_map: bool,
342    // How long to wait before initiating a `SuspendAndRestart` command, to
343    // prevent hot restart loops.
344    suspend_and_restart_delay: Duration,
345) -> PressOnDropButton
346where
347    G: Scope,
348    P: HealthOperator + 'static,
349{
350    // Derived config options
351    let healthcheck_worker_id = scope.index();
352    let worker_count = scope.peers();
353
354    // Inject the originating worker id to each item before exchanging to the chosen worker
355    let health_stream = health_stream.map(move |status| (healthcheck_worker_id, status));
356
357    let chosen_worker_id = if let Some(index) = health_operator_impl.chosen_worker() {
358        index
359    } else {
360        // We'll route all the work to a single arbitrary worker;
361        // there's not much to do, and we need a global view.
362        usize::cast_from(mark_starting.iter().next().hashed()) % worker_count
363    };
364
365    let is_active_worker = chosen_worker_id == healthcheck_worker_id;
366
367    let operator_name = format!("healthcheck({})", healthcheck_worker_id);
368    let mut health_op = AsyncOperatorBuilder::new(operator_name, scope.clone());
369
370    let mut input = health_op.new_disconnected_input(
371        &health_stream,
372        Exchange::new(move |_| u64::cast_from(chosen_worker_id)),
373    );
374
375    let button = health_op.build(move |mut _capabilities| async move {
376        let mut health_states: BTreeMap<_, _> = mark_starting
377            .iter()
378            .copied()
379            .chain([halting_id])
380            .map(|id| (id, HealthState::new(worker_count)))
381            .collect();
382
383        // Write the initial starting state to the status shard for all managed objects
384        if is_active_worker {
385            for (id, state) in health_states.iter_mut() {
386                if mark_starting.contains(id) {
387                    let status = OverallStatus::Starting;
388                    let timestamp = mz_ore::now::to_datetime(now());
389                    health_operator_impl.record_new_status(
390                        *id,
391                        timestamp,
392                        (&status).into(),
393                        status.error(),
394                        &status.hints(),
395                        status.errors().unwrap_or(&BTreeMap::new()),
396                        write_namespaced_map,
397                    );
398
399                    state.last_reported_status = Some(status);
400                }
401            }
402        }
403
404        let mut outputs_seen = BTreeMap::<GlobalId, BTreeSet<_>>::new();
405        while let Some(event) = input.next().await {
406            if let AsyncEvent::Data(_cap, rows) = event {
407                for (worker_id, message) in rows {
408                    let HealthStatusMessage {
409                        id,
410                        namespace: ns,
411                        update: health_event,
412                    } = message;
413                    let id = id.unwrap_or(halting_id);
414                    let HealthState {
415                        healths, halt_with, ..
416                    } = match health_states.get_mut(&id) {
417                        Some(health) => health,
418                        // This is a health status update for a sub-object_type that we did not request to
419                        // be generated, which means it doesn't have a GlobalId and should not be
420                        // propagated to the shard.
421                        None => continue,
422                    };
423
424                    // Its important to track `new_round` per-namespace, so namespaces are reasoned
425                    // about in `merge_update` independently.
426                    let new_round = outputs_seen
427                        .entry(id)
428                        .or_insert_with(BTreeSet::new)
429                        .insert(ns.clone());
430
431                    if !is_active_worker {
432                        error!(
433                            "Health messages for {object_type} {id} passed to \
434                              an unexpected worker id: {healthcheck_worker_id}"
435                        )
436                    }
437
438                    if health_event.should_halt() {
439                        *halt_with = Some((ns.clone(), health_event.clone()));
440                    }
441
442                    healths.merge_update(worker_id, ns, health_event, !new_round);
443                }
444
445                let mut halt_with_outer = None;
446
447                while let Some((id, _)) = outputs_seen.pop_first() {
448                    let HealthState {
449                        healths,
450                        last_reported_status,
451                        halt_with,
452                    } = health_states.get_mut(&id).expect("known to exist");
453
454                    let new_status = healths.decide_status();
455
456                    if Some(&new_status) != last_reported_status.as_ref() {
457                        info!(
458                            "Health transition for {object_type} {id}: \
459                                  {last_reported_status:?} -> {:?}",
460                            Some(&new_status)
461                        );
462
463                        let timestamp = mz_ore::now::to_datetime(now());
464                        health_operator_impl.record_new_status(
465                            id,
466                            timestamp,
467                            (&new_status).into(),
468                            new_status.error(),
469                            &new_status.hints(),
470                            new_status.errors().unwrap_or(&BTreeMap::new()),
471                            write_namespaced_map,
472                        );
473
474                        *last_reported_status = Some(new_status.clone());
475                    }
476
477                    // Set halt with if None.
478                    if halt_with_outer.is_none() && halt_with.is_some() {
479                        halt_with_outer = Some((id, halt_with.clone()));
480                    }
481                }
482
483                // TODO(aljoscha): Instead of threading through the
484                // `should_halt` bit, we can give an internal command sender
485                // directly to the places where `should_halt = true` originates.
486                // We should definitely do that, but this is okay for a PoC.
487                if let Some((id, halt_with)) = halt_with_outer {
488                    mz_ore::soft_assert_or_log!(
489                        id == halting_id,
490                        "sub{object_type}s should not produce \
491                        halting errors, however {:?} halted while primary \
492                                            {object_type} is {:?}",
493                        id,
494                        halting_id
495                    );
496
497                    info!(
498                        "Broadcasting suspend-and-restart \
499                        command because of {:?} after {:?} delay",
500                        halt_with, suspend_and_restart_delay
501                    );
502                    tokio::time::sleep(suspend_and_restart_delay).await;
503                    health_operator_impl.send_halt(id, halt_with);
504                }
505            }
506        }
507    });
508
509    button.press_on_drop()
510}
511
512use serde::{Deserialize, Serialize};
513
514/// NB: we derive Ord here, so the enum order matters. Generally, statuses later in the list
515/// take precedence over earlier ones: so if one worker is stalled, we'll consider the entire
516/// source to be stalled.
517#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
518pub enum HealthStatusUpdate {
519    Running,
520    Stalled {
521        error: String,
522        hint: Option<String>,
523        should_halt: bool,
524    },
525    Ceased {
526        error: String,
527    },
528}
529
530impl HealthStatusUpdate {
531    /// Generates a running [`HealthStatusUpdate`].
532    pub(crate) fn running() -> Self {
533        HealthStatusUpdate::Running
534    }
535
536    /// Generates a non-halting [`HealthStatusUpdate`] with `update`.
537    pub(crate) fn stalled(error: String, hint: Option<String>) -> Self {
538        HealthStatusUpdate::Stalled {
539            error,
540            hint,
541            should_halt: false,
542        }
543    }
544
545    /// Generates a halting [`HealthStatusUpdate`] with `update`.
546    pub(crate) fn halting(error: String, hint: Option<String>) -> Self {
547        HealthStatusUpdate::Stalled {
548            error,
549            hint,
550            should_halt: true,
551        }
552    }
553
554    // TODO: redesign ceased status database-issues#7687
555    // Generates a ceasing [`HealthStatusUpdate`] with `update`.
556    // pub(crate) fn ceasing(error: String) -> Self {
557    //     HealthStatusUpdate::Ceased { error }
558    // }
559
560    /// Whether or not we should halt the dataflow instances and restart it.
561    pub(crate) fn should_halt(&self) -> bool {
562        match self {
563            HealthStatusUpdate::Running |
564            // HealthStatusUpdate::Ceased should never halt because it can occur
565            // at the subsource level and should not cause the entire dataflow
566            // to halt. Instead, the dataflow itself should handle shutting
567            // itself down if need be.
568            HealthStatusUpdate::Ceased { .. } => false,
569            HealthStatusUpdate::Stalled { should_halt, .. } => *should_halt,
570        }
571    }
572}
573
574#[cfg(test)]
575mod tests {
576    use super::*;
577    use itertools::Itertools;
578
579    // Actual timely tests for `health_operator`.
580
581    #[mz_ore::test]
582    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
583    fn test_health_operator_basic() {
584        use Step::*;
585
586        // Test 2 inputs across 2 workers.
587        health_operator_runner(
588            2,
589            2,
590            true,
591            vec![
592                AssertStatus(vec![
593                    // Assert both inputs started.
594                    StatusToAssert {
595                        collection_index: 0,
596                        status: Status::Starting,
597                        ..Default::default()
598                    },
599                    StatusToAssert {
600                        collection_index: 1,
601                        status: Status::Starting,
602                        ..Default::default()
603                    },
604                ]),
605                // Update and assert one is running.
606                Update(TestUpdate {
607                    worker_id: 1,
608                    namespace: StatusNamespace::Generator,
609                    id: None,
610                    update: HealthStatusUpdate::running(),
611                }),
612                AssertStatus(vec![StatusToAssert {
613                    collection_index: 0,
614                    status: Status::Running,
615                    ..Default::default()
616                }]),
617                // Assert the other can be stalled by 1 worker.
618                //
619                // TODO(guswynn): ideally we could push these updates
620                // at the same time, but because they are coming from separately
621                // workers, they could end up in different rounds, causing flakes.
622                // For now, we just do this.
623                Update(TestUpdate {
624                    worker_id: 1,
625                    namespace: StatusNamespace::Generator,
626                    id: Some(GlobalId::User(1)),
627                    update: HealthStatusUpdate::running(),
628                }),
629                AssertStatus(vec![StatusToAssert {
630                    collection_index: 1,
631                    status: Status::Running,
632                    ..Default::default()
633                }]),
634                Update(TestUpdate {
635                    worker_id: 0,
636                    namespace: StatusNamespace::Generator,
637                    id: Some(GlobalId::User(1)),
638                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
639                }),
640                AssertStatus(vec![StatusToAssert {
641                    collection_index: 1,
642                    status: Status::Stalled,
643                    error: Some("generator: uhoh".to_string()),
644                    errors: Some("generator: uhoh".to_string()),
645                    ..Default::default()
646                }]),
647                // And that it can recover.
648                Update(TestUpdate {
649                    worker_id: 0,
650                    namespace: StatusNamespace::Generator,
651                    id: Some(GlobalId::User(1)),
652                    update: HealthStatusUpdate::running(),
653                }),
654                AssertStatus(vec![StatusToAssert {
655                    collection_index: 1,
656                    status: Status::Running,
657                    ..Default::default()
658                }]),
659            ],
660        );
661    }
662
663    #[mz_ore::test]
664    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
665    fn test_health_operator_write_namespaced_map() {
666        use Step::*;
667
668        // Test 2 inputs across 2 workers.
669        health_operator_runner(
670            2,
671            2,
672            // testing this
673            false,
674            vec![
675                AssertStatus(vec![
676                    // Assert both inputs started.
677                    StatusToAssert {
678                        collection_index: 0,
679                        status: Status::Starting,
680                        ..Default::default()
681                    },
682                    StatusToAssert {
683                        collection_index: 1,
684                        status: Status::Starting,
685                        ..Default::default()
686                    },
687                ]),
688                Update(TestUpdate {
689                    worker_id: 0,
690                    namespace: StatusNamespace::Generator,
691                    id: Some(GlobalId::User(1)),
692                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
693                }),
694                AssertStatus(vec![StatusToAssert {
695                    collection_index: 1,
696                    status: Status::Stalled,
697                    error: Some("generator: uhoh".to_string()),
698                    errors: None,
699                    ..Default::default()
700                }]),
701            ],
702        )
703    }
704
705    #[mz_ore::test]
706    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
707    fn test_health_operator_namespaces() {
708        use Step::*;
709
710        // Test 2 inputs across 2 workers.
711        health_operator_runner(
712            2,
713            1,
714            true,
715            vec![
716                AssertStatus(vec![
717                    // Assert both inputs started.
718                    StatusToAssert {
719                        collection_index: 0,
720                        status: Status::Starting,
721                        ..Default::default()
722                    },
723                ]),
724                // Assert that we merge namespaced errors correctly.
725                //
726                // Note that these all happen on the same worker id.
727                Update(TestUpdate {
728                    worker_id: 0,
729                    namespace: StatusNamespace::Generator,
730                    id: None,
731                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
732                }),
733                AssertStatus(vec![StatusToAssert {
734                    collection_index: 0,
735                    status: Status::Stalled,
736                    error: Some("generator: uhoh".to_string()),
737                    errors: Some("generator: uhoh".to_string()),
738                    ..Default::default()
739                }]),
740                Update(TestUpdate {
741                    worker_id: 0,
742                    namespace: StatusNamespace::Kafka,
743                    id: None,
744                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
745                }),
746                AssertStatus(vec![StatusToAssert {
747                    collection_index: 0,
748                    status: Status::Stalled,
749                    error: Some("kafka: uhoh".to_string()),
750                    errors: Some("generator: uhoh, kafka: uhoh".to_string()),
751                    ..Default::default()
752                }]),
753                // And that it can recover.
754                Update(TestUpdate {
755                    worker_id: 0,
756                    namespace: StatusNamespace::Kafka,
757                    id: None,
758                    update: HealthStatusUpdate::running(),
759                }),
760                AssertStatus(vec![StatusToAssert {
761                    collection_index: 0,
762                    status: Status::Stalled,
763                    error: Some("generator: uhoh".to_string()),
764                    errors: Some("generator: uhoh".to_string()),
765                    ..Default::default()
766                }]),
767                Update(TestUpdate {
768                    worker_id: 0,
769                    namespace: StatusNamespace::Generator,
770                    id: None,
771                    update: HealthStatusUpdate::running(),
772                }),
773                AssertStatus(vec![StatusToAssert {
774                    collection_index: 0,
775                    status: Status::Running,
776                    ..Default::default()
777                }]),
778            ],
779        );
780    }
781
782    #[mz_ore::test]
783    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
784    fn test_health_operator_namespace_side_channel() {
785        use Step::*;
786
787        health_operator_runner(
788            2,
789            1,
790            true,
791            vec![
792                AssertStatus(vec![
793                    // Assert both inputs started.
794                    StatusToAssert {
795                        collection_index: 0,
796                        status: Status::Starting,
797                        ..Default::default()
798                    },
799                ]),
800                // Assert that sidechannel namespaces don't downgrade the status
801                //
802                // Note that these all happen on the same worker id.
803                Update(TestUpdate {
804                    worker_id: 0,
805                    namespace: StatusNamespace::Ssh,
806                    id: None,
807                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
808                }),
809                AssertStatus(vec![StatusToAssert {
810                    collection_index: 0,
811                    status: Status::Stalled,
812                    error: Some("ssh: uhoh".to_string()),
813                    errors: Some("ssh: uhoh".to_string()),
814                    ..Default::default()
815                }]),
816                Update(TestUpdate {
817                    worker_id: 0,
818                    namespace: StatusNamespace::Ssh,
819                    id: None,
820                    update: HealthStatusUpdate::stalled("uhoh2".to_string(), None),
821                }),
822                AssertStatus(vec![StatusToAssert {
823                    collection_index: 0,
824                    status: Status::Stalled,
825                    error: Some("ssh: uhoh2".to_string()),
826                    errors: Some("ssh: uhoh2".to_string()),
827                    ..Default::default()
828                }]),
829                Update(TestUpdate {
830                    worker_id: 0,
831                    namespace: StatusNamespace::Ssh,
832                    id: None,
833                    update: HealthStatusUpdate::running(),
834                }),
835                // We haven't starting running yet, as a `Default` namespace hasn't told us.
836                AssertStatus(vec![StatusToAssert {
837                    collection_index: 0,
838                    status: Status::Starting,
839                    ..Default::default()
840                }]),
841                Update(TestUpdate {
842                    worker_id: 0,
843                    namespace: StatusNamespace::Generator,
844                    id: None,
845                    update: HealthStatusUpdate::running(),
846                }),
847                AssertStatus(vec![StatusToAssert {
848                    collection_index: 0,
849                    status: Status::Running,
850                    ..Default::default()
851                }]),
852            ],
853        );
854    }
855
856    #[mz_ore::test]
857    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
858    fn test_health_operator_hints() {
859        use Step::*;
860
861        health_operator_runner(
862            2,
863            1,
864            true,
865            vec![
866                AssertStatus(vec![
867                    // Assert both inputs started.
868                    StatusToAssert {
869                        collection_index: 0,
870                        status: Status::Starting,
871                        ..Default::default()
872                    },
873                ]),
874                // Note that these all happen across worker ids.
875                Update(TestUpdate {
876                    worker_id: 0,
877                    namespace: StatusNamespace::Generator,
878                    id: None,
879                    update: HealthStatusUpdate::stalled(
880                        "uhoh".to_string(),
881                        Some("hint1".to_string()),
882                    ),
883                }),
884                AssertStatus(vec![StatusToAssert {
885                    collection_index: 0,
886                    status: Status::Stalled,
887                    error: Some("generator: uhoh".to_string()),
888                    errors: Some("generator: uhoh".to_string()),
889                    hint: Some("hint1".to_string()),
890                }]),
891                Update(TestUpdate {
892                    worker_id: 1,
893                    namespace: StatusNamespace::Generator,
894                    id: None,
895                    update: HealthStatusUpdate::stalled(
896                        "uhoh2".to_string(),
897                        Some("hint2".to_string()),
898                    ),
899                }),
900                AssertStatus(vec![StatusToAssert {
901                    collection_index: 0,
902                    status: Status::Stalled,
903                    // Note the error sorts later so we just use that.
904                    error: Some("generator: uhoh2".to_string()),
905                    errors: Some("generator: uhoh2".to_string()),
906                    hint: Some("hint1, hint2".to_string()),
907                }]),
908                // Update one of the hints
909                Update(TestUpdate {
910                    worker_id: 1,
911                    namespace: StatusNamespace::Generator,
912                    id: None,
913                    update: HealthStatusUpdate::stalled(
914                        "uhoh2".to_string(),
915                        Some("hint3".to_string()),
916                    ),
917                }),
918                AssertStatus(vec![StatusToAssert {
919                    collection_index: 0,
920                    status: Status::Stalled,
921                    // Note the error sorts later so we just use that.
922                    error: Some("generator: uhoh2".to_string()),
923                    errors: Some("generator: uhoh2".to_string()),
924                    hint: Some("hint1, hint3".to_string()),
925                }]),
926                // Assert recovery.
927                Update(TestUpdate {
928                    worker_id: 0,
929                    namespace: StatusNamespace::Generator,
930                    id: None,
931                    update: HealthStatusUpdate::running(),
932                }),
933                AssertStatus(vec![StatusToAssert {
934                    collection_index: 0,
935                    status: Status::Stalled,
936                    // Note the error sorts later so we just use that.
937                    error: Some("generator: uhoh2".to_string()),
938                    errors: Some("generator: uhoh2".to_string()),
939                    hint: Some("hint3".to_string()),
940                }]),
941                Update(TestUpdate {
942                    worker_id: 1,
943                    namespace: StatusNamespace::Generator,
944                    id: None,
945                    update: HealthStatusUpdate::running(),
946                }),
947                AssertStatus(vec![StatusToAssert {
948                    collection_index: 0,
949                    status: Status::Running,
950                    ..Default::default()
951                }]),
952            ],
953        );
954    }
955
956    // The below is ALL test infrastructure for the above
957
958    use mz_ore::assert_err;
959    use timely::container::CapacityContainerBuilder;
960    use timely::dataflow::Scope;
961    use timely::dataflow::operators::Enter;
962    use timely::dataflow::operators::exchange::Exchange;
963    use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
964
965    /// A status to assert.
966    #[derive(Debug, Clone, PartialEq, Eq)]
967    struct StatusToAssert {
968        collection_index: usize,
969        status: Status,
970        error: Option<String>,
971        errors: Option<String>,
972        hint: Option<String>,
973    }
974
975    impl Default for StatusToAssert {
976        fn default() -> Self {
977            StatusToAssert {
978                collection_index: Default::default(),
979                status: Status::Running,
980                error: Default::default(),
981                errors: Default::default(),
982                hint: Default::default(),
983            }
984        }
985    }
986
987    /// An update to push into the operator.
988    /// Can come from any worker, and from any input.
989    #[derive(Debug, Clone)]
990    struct TestUpdate {
991        worker_id: u64,
992        namespace: StatusNamespace,
993        id: Option<GlobalId>,
994        update: HealthStatusUpdate,
995    }
996
997    #[derive(Debug, Clone)]
998    enum Step {
999        /// Insert a new health update.
1000        Update(TestUpdate),
1001        /// Assert a set of outputs. Note that these should
1002        /// have unique `collection_index`'s
1003        AssertStatus(Vec<StatusToAssert>),
1004    }
1005
1006    struct TestWriter {
1007        sender: UnboundedSender<StatusToAssert>,
1008        input_mapping: BTreeMap<GlobalId, usize>,
1009    }
1010
1011    impl HealthOperator for TestWriter {
1012        fn record_new_status(
1013            &self,
1014            collection_id: GlobalId,
1015            _ts: DateTime<Utc>,
1016            status: Status,
1017            new_error: Option<&str>,
1018            hints: &BTreeSet<String>,
1019            namespaced_errors: &BTreeMap<StatusNamespace, String>,
1020            write_namespaced_map: bool,
1021        ) {
1022            let _ = self.sender.send(StatusToAssert {
1023                collection_index: *self.input_mapping.get(&collection_id).unwrap(),
1024                status,
1025                error: new_error.map(str::to_string),
1026                errors: if !namespaced_errors.is_empty() && write_namespaced_map {
1027                    Some(
1028                        namespaced_errors
1029                            .iter()
1030                            .map(|(ns, err)| format!("{}: {}", ns, err))
1031                            .join(", "),
1032                    )
1033                } else {
1034                    None
1035                },
1036                hint: if !hints.is_empty() {
1037                    Some(hints.iter().join(", "))
1038                } else {
1039                    None
1040                },
1041            });
1042        }
1043
1044        fn send_halt(&self, _id: GlobalId, _error: Option<(StatusNamespace, HealthStatusUpdate)>) {
1045            // Not yet unit-tested
1046            unimplemented!()
1047        }
1048
1049        fn chosen_worker(&self) -> Option<usize> {
1050            // We input and assert outputs on the first worker.
1051            Some(0)
1052        }
1053    }
1054
1055    /// Setup a `health_operator` with a set number of workers and inputs, and the
1056    /// steps on the first worker.
1057    fn health_operator_runner(
1058        workers: usize,
1059        inputs: usize,
1060        write_namespaced_map: bool,
1061        steps: Vec<Step>,
1062    ) {
1063        let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
1064        let tokio_handle = tokio_runtime.handle().clone();
1065
1066        let inputs: BTreeMap<GlobalId, usize> = (0..inputs)
1067            .map(|index| (GlobalId::User(u64::cast_from(index)), index))
1068            .collect();
1069
1070        timely::execute::execute(
1071            timely::execute::Config {
1072                communication: timely::CommunicationConfig::Process(workers),
1073                worker: Default::default(),
1074            },
1075            move |worker| {
1076                let steps = steps.clone();
1077                let inputs = inputs.clone();
1078
1079                let _tokio_guard = tokio_handle.enter();
1080                let (in_tx, in_rx) = unbounded_channel();
1081                let (out_tx, mut out_rx) = unbounded_channel();
1082
1083                worker.dataflow::<(), _, _>(|root_scope| {
1084                    root_scope
1085                        .clone()
1086                        .scoped::<mz_repr::Timestamp, _, _>("gus", |scope| {
1087                            let input = producer(root_scope.clone(), in_rx).enter(scope);
1088                            Box::leak(Box::new(health_operator(
1089                                scope,
1090                                mz_ore::now::SYSTEM_TIME.clone(),
1091                                inputs.keys().copied().collect(),
1092                                *inputs.first_key_value().unwrap().0,
1093                                "source_test",
1094                                &input,
1095                                TestWriter {
1096                                    sender: out_tx,
1097                                    input_mapping: inputs,
1098                                },
1099                                write_namespaced_map,
1100                                Duration::from_secs(5),
1101                            )));
1102                        });
1103                });
1104
1105                // We arbitrarily do all the testing on the first worker.
1106                if worker.index() == 0 {
1107                    use Step::*;
1108                    for step in steps {
1109                        match step {
1110                            Update(update) => {
1111                                let _ = in_tx.send(update);
1112                            }
1113                            AssertStatus(mut statuses) => loop {
1114                                match out_rx.try_recv() {
1115                                    Err(_) => {
1116                                        worker.step();
1117                                        // This makes testing easier.
1118                                        std::thread::sleep(std::time::Duration::from_millis(50));
1119                                    }
1120                                    Ok(update) => {
1121                                        let pos = statuses
1122                                            .iter()
1123                                            .position(|s| {
1124                                                s.collection_index == update.collection_index
1125                                            })
1126                                            .unwrap();
1127
1128                                        let status_to_assert = &statuses[pos];
1129                                        assert_eq!(&update, status_to_assert);
1130
1131                                        statuses.remove(pos);
1132                                        if statuses.is_empty() {
1133                                            break;
1134                                        }
1135                                    }
1136                                }
1137                            },
1138                        }
1139                    }
1140
1141                    // Assert that nothing is left in the channel.
1142                    assert_err!(out_rx.try_recv());
1143                }
1144            },
1145        )
1146        .unwrap();
1147    }
1148
1149    /// Produces (input_index, HealthStatusUpdate)'s based on the input channel.
1150    ///
1151    /// Only the first worker is used, all others immediately drop their capabilities and channels.
1152    /// After the channel is empty on the first worker, then the frontier will go to [].
1153    /// Also ensures that updates are routed to the correct worker based on the `TestUpdate`
1154    /// using an exchange.
1155    fn producer<G: Scope<Timestamp = ()>>(
1156        scope: G,
1157        mut input: UnboundedReceiver<TestUpdate>,
1158    ) -> Stream<G, HealthStatusMessage> {
1159        let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope.clone());
1160        let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<_>>>();
1161
1162        let index = scope.index();
1163        iterator.build(|mut caps| async move {
1164            // We input and assert outputs on the first worker.
1165            if index != 0 {
1166                return;
1167            }
1168            let mut capability = Some(caps.pop().unwrap());
1169            while let Some(element) = input.recv().await {
1170                output_handle.give(
1171                    capability.as_ref().unwrap(),
1172                    (
1173                        element.worker_id,
1174                        element.id,
1175                        element.namespace,
1176                        element.update,
1177                    ),
1178                );
1179            }
1180
1181            capability.take();
1182        });
1183
1184        let output = output.exchange(|d| d.0).map(|d| HealthStatusMessage {
1185            id: d.1,
1186            namespace: d.2,
1187            update: d.3,
1188        });
1189
1190        output
1191    }
1192}