mz_storage/
healthcheck.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Healthcheck common
11
12use std::cell::RefCell;
13use std::collections::btree_map::Entry;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::fmt::Debug;
17use std::rc::Rc;
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use differential_dataflow::Hashable;
22use futures::StreamExt;
23use mz_ore::cast::CastFrom;
24use mz_ore::now::NowFn;
25use mz_repr::GlobalId;
26use mz_storage_client::client::{Status, StatusUpdate};
27use mz_timely_util::builder_async::{
28    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
29};
30use timely::dataflow::channels::pact::Exchange;
31use timely::dataflow::operators::Map;
32use timely::dataflow::{Scope, Stream};
33use tracing::{error, info};
34
35use crate::internal_control::{InternalCommandSender, InternalStorageCommand};
36
37/// The namespace of the update. The `Ord` impl matter here, later variants are
38/// displayed over earlier ones.
39///
40/// Some namespaces (referred to as "sidechannels") can come from any worker_id,
41/// and `Running` statuses from them do not mark the entire object as running.
42///
43/// Ensure you update `is_sidechannel` when adding variants.
44#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
45pub enum StatusNamespace {
46    /// A normal status namespaces. Any `Running` status from any worker will mark the object
47    /// `Running`.
48    Generator,
49    Kafka,
50    Postgres,
51    MySql,
52    SqlServer,
53    Ssh,
54    Upsert,
55    Decode,
56    Iceberg,
57    Internal,
58}
59
60impl StatusNamespace {
61    fn is_sidechannel(&self) -> bool {
62        matches!(self, StatusNamespace::Ssh)
63    }
64}
65
66impl fmt::Display for StatusNamespace {
67    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68        use StatusNamespace::*;
69        match self {
70            Generator => write!(f, "generator"),
71            Kafka => write!(f, "kafka"),
72            Postgres => write!(f, "postgres"),
73            MySql => write!(f, "mysql"),
74            SqlServer => write!(f, "sql-server"),
75            Ssh => write!(f, "ssh"),
76            Upsert => write!(f, "upsert"),
77            Decode => write!(f, "decode"),
78            Internal => write!(f, "internal"),
79            Iceberg => write!(f, "iceberg"),
80        }
81    }
82}
83
84#[derive(Debug)]
85struct PerWorkerHealthStatus {
86    pub(crate) errors_by_worker: Vec<BTreeMap<StatusNamespace, HealthStatusUpdate>>,
87}
88
89impl PerWorkerHealthStatus {
90    fn merge_update(
91        &mut self,
92        worker: usize,
93        namespace: StatusNamespace,
94        update: HealthStatusUpdate,
95        only_greater: bool,
96    ) {
97        let errors = &mut self.errors_by_worker[worker];
98        match errors.entry(namespace) {
99            Entry::Vacant(v) => {
100                v.insert(update);
101            }
102            Entry::Occupied(mut o) => {
103                if !only_greater || o.get() < &update {
104                    o.insert(update);
105                }
106            }
107        }
108    }
109
110    fn decide_status(&self) -> OverallStatus {
111        let mut output_status = OverallStatus::Starting;
112        let mut namespaced_errors: BTreeMap<StatusNamespace, String> = BTreeMap::new();
113        let mut hints: BTreeSet<String> = BTreeSet::new();
114
115        for status in self.errors_by_worker.iter() {
116            for (ns, ns_status) in status.iter() {
117                match ns_status {
118                    // HealthStatusUpdate::Ceased is currently unused, so just
119                    // treat it as if it were a normal error.
120                    //
121                    // TODO: redesign ceased status database-issues#7687
122                    HealthStatusUpdate::Ceased { error } => {
123                        if Some(error) > namespaced_errors.get(ns).as_deref() {
124                            namespaced_errors.insert(*ns, error.to_string());
125                        }
126                    }
127                    HealthStatusUpdate::Stalled { error, hint, .. } => {
128                        if Some(error) > namespaced_errors.get(ns).as_deref() {
129                            namespaced_errors.insert(*ns, error.to_string());
130                        }
131
132                        if let Some(hint) = hint {
133                            hints.insert(hint.to_string());
134                        }
135                    }
136                    HealthStatusUpdate::Running => {
137                        if !ns.is_sidechannel() {
138                            output_status = OverallStatus::Running;
139                        }
140                    }
141                }
142            }
143        }
144
145        if !namespaced_errors.is_empty() {
146            // Pick the most important error.
147            let (ns, err) = namespaced_errors.last_key_value().unwrap();
148            output_status = OverallStatus::Stalled {
149                error: format!("{}: {}", ns, err),
150                hints,
151                namespaced_errors,
152            }
153        }
154
155        output_status
156    }
157}
158
159#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
160pub enum OverallStatus {
161    Starting,
162    Running,
163    Stalled {
164        error: String,
165        hints: BTreeSet<String>,
166        namespaced_errors: BTreeMap<StatusNamespace, String>,
167    },
168    Ceased {
169        error: String,
170    },
171}
172
173impl OverallStatus {
174    /// The user-readable error string, if there is one.
175    pub(crate) fn error(&self) -> Option<&str> {
176        match self {
177            OverallStatus::Starting | OverallStatus::Running => None,
178            OverallStatus::Stalled { error, .. } | OverallStatus::Ceased { error, .. } => {
179                Some(error)
180            }
181        }
182    }
183
184    /// A set of namespaced errors, if there are any.
185    pub(crate) fn errors(&self) -> Option<&BTreeMap<StatusNamespace, String>> {
186        match self {
187            OverallStatus::Starting | OverallStatus::Running | OverallStatus::Ceased { .. } => None,
188            OverallStatus::Stalled {
189                namespaced_errors, ..
190            } => Some(namespaced_errors),
191        }
192    }
193
194    /// A set of hints, if there are any.
195    pub(crate) fn hints(&self) -> BTreeSet<String> {
196        match self {
197            OverallStatus::Starting | OverallStatus::Running | OverallStatus::Ceased { .. } => {
198                BTreeSet::new()
199            }
200            OverallStatus::Stalled { hints, .. } => hints.clone(),
201        }
202    }
203}
204
205impl<'a> From<&'a OverallStatus> for Status {
206    fn from(val: &'a OverallStatus) -> Self {
207        match val {
208            OverallStatus::Starting => Status::Starting,
209            OverallStatus::Running => Status::Running,
210            OverallStatus::Stalled { .. } => Status::Stalled,
211            OverallStatus::Ceased { .. } => Status::Ceased,
212        }
213    }
214}
215
216#[derive(Debug)]
217struct HealthState {
218    healths: PerWorkerHealthStatus,
219    last_reported_status: Option<OverallStatus>,
220    halt_with: Option<(StatusNamespace, HealthStatusUpdate)>,
221}
222
223impl HealthState {
224    fn new(worker_count: usize) -> HealthState {
225        HealthState {
226            healths: PerWorkerHealthStatus {
227                errors_by_worker: vec![Default::default(); worker_count],
228            },
229            last_reported_status: None,
230            halt_with: None,
231        }
232    }
233}
234
235/// A trait that lets a user configure the `health_operator` with custom
236/// behavior. This is mostly useful for testing, and the [`DefaultWriter`]
237/// should be the correct implementation for everyone.
238pub trait HealthOperator {
239    /// Record a new status.
240    fn record_new_status(
241        &self,
242        collection_id: GlobalId,
243        ts: DateTime<Utc>,
244        new_status: Status,
245        new_error: Option<&str>,
246        hints: &BTreeSet<String>,
247        namespaced_errors: &BTreeMap<StatusNamespace, String>,
248        // TODO(guswynn): not urgent:
249        // Ideally this would be entirely included in the `DefaultWriter`, but that
250        // requires a fairly heavy change to the `health_operator`, which hardcodes
251        // some use of persist. For now we just leave it and ignore it in tests.
252        write_namespaced_map: bool,
253    );
254    fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>);
255
256    /// Optionally override the chosen worker index. Default is semi-random.
257    /// Only useful for tests.
258    fn chosen_worker(&self) -> Option<usize> {
259        None
260    }
261}
262
263/// A default `HealthOperator` for use in normal cases.
264pub struct DefaultWriter {
265    pub command_tx: InternalCommandSender,
266    pub updates: Rc<RefCell<Vec<StatusUpdate>>>,
267}
268
269impl HealthOperator for DefaultWriter {
270    fn record_new_status(
271        &self,
272        collection_id: GlobalId,
273        ts: DateTime<Utc>,
274        status: Status,
275        new_error: Option<&str>,
276        hints: &BTreeSet<String>,
277        namespaced_errors: &BTreeMap<StatusNamespace, String>,
278        write_namespaced_map: bool,
279    ) {
280        self.updates.borrow_mut().push(StatusUpdate {
281            id: collection_id,
282            timestamp: ts,
283            status,
284            error: new_error.map(|e| e.to_string()),
285            hints: hints.clone(),
286            namespaced_errors: if write_namespaced_map {
287                namespaced_errors
288                    .iter()
289                    .map(|(ns, val)| (ns.to_string(), val.clone()))
290                    .collect()
291            } else {
292                BTreeMap::new()
293            },
294            replica_id: None,
295        });
296    }
297
298    fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>) {
299        self.command_tx
300            .send(InternalStorageCommand::SuspendAndRestart {
301                // Suspend and restart is expected to operate on the primary object and
302                // not any of the sub-objects
303                id,
304                reason: format!("{:?}", error),
305            });
306    }
307}
308
309/// A health message consumed by the `health_operator`.
310#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
311pub struct HealthStatusMessage {
312    /// The object that this status message is about. When None, it refers to the entire ingestion
313    /// as a whole. When Some, it refers to a specific subsource.
314    pub id: Option<GlobalId>,
315    /// The namespace of the health update.
316    pub namespace: StatusNamespace,
317    /// The update itself.
318    pub update: HealthStatusUpdate,
319}
320
321/// Writes updates that come across `health_stream` to the collection's status shards, as identified
322/// by their `CollectionMetadata`.
323///
324/// Only one worker will be active and write to the status shard.
325///
326/// The `OutputIndex` values that come across `health_stream` must be a strict subset of those in
327/// `configs`'s keys.
328pub(crate) fn health_operator<G, P>(
329    scope: &G,
330    now: NowFn,
331    // A set of id's that should be marked as `HealthStatusUpdate::starting()` during startup.
332    mark_starting: BTreeSet<GlobalId>,
333    // An id that is allowed to halt the dataflow. Others are ignored, and panic during debug
334    // mode.
335    halting_id: GlobalId,
336    // A description of the object type we are writing status updates about. Used in log lines.
337    object_type: &'static str,
338    // An indexed stream of health updates. Indexes are configured in `configs`.
339    health_stream: &Stream<G, HealthStatusMessage>,
340    // An impl of `HealthOperator` that configures the output behavior of this operator.
341    health_operator_impl: P,
342    // Whether or not we should actually write namespaced errors in the `details` column.
343    write_namespaced_map: bool,
344    // How long to wait before initiating a `SuspendAndRestart` command, to
345    // prevent hot restart loops.
346    suspend_and_restart_delay: Duration,
347) -> PressOnDropButton
348where
349    G: Scope,
350    P: HealthOperator + 'static,
351{
352    // Derived config options
353    let healthcheck_worker_id = scope.index();
354    let worker_count = scope.peers();
355
356    // Inject the originating worker id to each item before exchanging to the chosen worker
357    let health_stream = health_stream.map(move |status| (healthcheck_worker_id, status));
358
359    let chosen_worker_id = if let Some(index) = health_operator_impl.chosen_worker() {
360        index
361    } else {
362        // We'll route all the work to a single arbitrary worker;
363        // there's not much to do, and we need a global view.
364        usize::cast_from(mark_starting.iter().next().hashed()) % worker_count
365    };
366
367    let is_active_worker = chosen_worker_id == healthcheck_worker_id;
368
369    let operator_name = format!("healthcheck({})", healthcheck_worker_id);
370    let mut health_op = AsyncOperatorBuilder::new(operator_name, scope.clone());
371
372    let mut input = health_op.new_disconnected_input(
373        &health_stream,
374        Exchange::new(move |_| u64::cast_from(chosen_worker_id)),
375    );
376
377    let button = health_op.build(move |mut _capabilities| async move {
378        let mut health_states: BTreeMap<_, _> = mark_starting
379            .iter()
380            .copied()
381            .chain([halting_id])
382            .map(|id| (id, HealthState::new(worker_count)))
383            .collect();
384
385        // Write the initial starting state to the status shard for all managed objects
386        if is_active_worker {
387            for (id, state) in health_states.iter_mut() {
388                let status = OverallStatus::Starting;
389                let timestamp = mz_ore::now::to_datetime(now());
390                health_operator_impl.record_new_status(
391                    *id,
392                    timestamp,
393                    (&status).into(),
394                    status.error(),
395                    &status.hints(),
396                    status.errors().unwrap_or(&BTreeMap::new()),
397                    write_namespaced_map,
398                );
399
400                state.last_reported_status = Some(status);
401            }
402        }
403
404        let mut outputs_seen = BTreeMap::<GlobalId, BTreeSet<_>>::new();
405        while let Some(event) = input.next().await {
406            if let AsyncEvent::Data(_cap, rows) = event {
407                for (worker_id, message) in rows {
408                    let HealthStatusMessage {
409                        id,
410                        namespace: ns,
411                        update: health_event,
412                    } = message;
413                    let id = id.unwrap_or(halting_id);
414                    let HealthState {
415                        healths, halt_with, ..
416                    } = match health_states.get_mut(&id) {
417                        Some(health) => health,
418                        // This is a health status update for a sub-object_type that we did not request to
419                        // be generated, which means it doesn't have a GlobalId and should not be
420                        // propagated to the shard.
421                        None => continue,
422                    };
423
424                    // Its important to track `new_round` per-namespace, so namespaces are reasoned
425                    // about in `merge_update` independently.
426                    let new_round = outputs_seen
427                        .entry(id)
428                        .or_insert_with(BTreeSet::new)
429                        .insert(ns.clone());
430
431                    if !is_active_worker {
432                        error!(
433                            "Health messages for {object_type} {id} passed to \
434                              an unexpected worker id: {healthcheck_worker_id}"
435                        )
436                    }
437
438                    if health_event.should_halt() {
439                        *halt_with = Some((ns.clone(), health_event.clone()));
440                    }
441
442                    healths.merge_update(worker_id, ns, health_event, !new_round);
443                }
444
445                let mut halt_with_outer = None;
446
447                while let Some((id, _)) = outputs_seen.pop_first() {
448                    let HealthState {
449                        healths,
450                        last_reported_status,
451                        halt_with,
452                    } = health_states.get_mut(&id).expect("known to exist");
453
454                    let new_status = healths.decide_status();
455
456                    if Some(&new_status) != last_reported_status.as_ref() {
457                        info!(
458                            "Health transition for {object_type} {id}: \
459                                  {last_reported_status:?} -> {:?}",
460                            Some(&new_status)
461                        );
462
463                        let timestamp = mz_ore::now::to_datetime(now());
464                        health_operator_impl.record_new_status(
465                            id,
466                            timestamp,
467                            (&new_status).into(),
468                            new_status.error(),
469                            &new_status.hints(),
470                            new_status.errors().unwrap_or(&BTreeMap::new()),
471                            write_namespaced_map,
472                        );
473
474                        *last_reported_status = Some(new_status.clone());
475                    }
476
477                    // Set halt with if None.
478                    if halt_with_outer.is_none() && halt_with.is_some() {
479                        halt_with_outer = Some((id, halt_with.clone()));
480                    }
481                }
482
483                // TODO(aljoscha): Instead of threading through the
484                // `should_halt` bit, we can give an internal command sender
485                // directly to the places where `should_halt = true` originates.
486                // We should definitely do that, but this is okay for a PoC.
487                if let Some((id, halt_with)) = halt_with_outer {
488                    mz_ore::soft_assert_or_log!(
489                        id == halting_id,
490                        "sub{object_type}s should not produce \
491                        halting errors, however {:?} halted while primary \
492                                            {object_type} is {:?}",
493                        id,
494                        halting_id
495                    );
496
497                    info!(
498                        "Broadcasting suspend-and-restart \
499                        command because of {:?} after {:?} delay",
500                        halt_with, suspend_and_restart_delay
501                    );
502                    tokio::time::sleep(suspend_and_restart_delay).await;
503                    health_operator_impl.send_halt(id, halt_with);
504                }
505            }
506        }
507    });
508
509    button.press_on_drop()
510}
511
512use serde::{Deserialize, Serialize};
513
514/// NB: we derive Ord here, so the enum order matters. Generally, statuses later in the list
515/// take precedence over earlier ones: so if one worker is stalled, we'll consider the entire
516/// source to be stalled.
517#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
518pub enum HealthStatusUpdate {
519    Running,
520    Stalled {
521        error: String,
522        hint: Option<String>,
523        should_halt: bool,
524    },
525    Ceased {
526        error: String,
527    },
528}
529
530impl HealthStatusUpdate {
531    /// Generates a running [`HealthStatusUpdate`].
532    pub(crate) fn running() -> Self {
533        HealthStatusUpdate::Running
534    }
535
536    /// Generates a non-halting [`HealthStatusUpdate`] with `update`.
537    pub(crate) fn stalled(error: String, hint: Option<String>) -> Self {
538        HealthStatusUpdate::Stalled {
539            error,
540            hint,
541            should_halt: false,
542        }
543    }
544
545    /// Generates a halting [`HealthStatusUpdate`] with `update`.
546    pub(crate) fn halting(error: String, hint: Option<String>) -> Self {
547        HealthStatusUpdate::Stalled {
548            error,
549            hint,
550            should_halt: true,
551        }
552    }
553
554    // TODO: redesign ceased status database-issues#7687
555    // Generates a ceasing [`HealthStatusUpdate`] with `update`.
556    // pub(crate) fn ceasing(error: String) -> Self {
557    //     HealthStatusUpdate::Ceased { error }
558    // }
559
560    /// Whether or not we should halt the dataflow instances and restart it.
561    pub(crate) fn should_halt(&self) -> bool {
562        match self {
563            HealthStatusUpdate::Running |
564            // HealthStatusUpdate::Ceased should never halt because it can occur
565            // at the subsource level and should not cause the entire dataflow
566            // to halt. Instead, the dataflow itself should handle shutting
567            // itself down if need be.
568            HealthStatusUpdate::Ceased { .. } => false,
569            HealthStatusUpdate::Stalled { should_halt, .. } => *should_halt,
570        }
571    }
572}
573
574#[cfg(test)]
575mod tests {
576    use super::*;
577    use itertools::Itertools;
578
579    // Actual timely tests for `health_operator`.
580
581    #[mz_ore::test]
582    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
583    fn test_health_operator_basic() {
584        use Step::*;
585
586        // Test 2 inputs across 2 workers.
587        health_operator_runner(
588            2,
589            2,
590            true,
591            vec![
592                AssertStatus(vec![
593                    // Assert both inputs started.
594                    StatusToAssert {
595                        collection_index: 0,
596                        status: Status::Starting,
597                        ..Default::default()
598                    },
599                    StatusToAssert {
600                        collection_index: 1,
601                        status: Status::Starting,
602                        ..Default::default()
603                    },
604                ]),
605                // Update and assert one is running.
606                Update(TestUpdate {
607                    worker_id: 1,
608                    namespace: StatusNamespace::Generator,
609                    id: None,
610                    update: HealthStatusUpdate::running(),
611                }),
612                AssertStatus(vec![StatusToAssert {
613                    collection_index: 0,
614                    status: Status::Running,
615                    ..Default::default()
616                }]),
617                // Assert the other can be stalled by 1 worker.
618                //
619                // TODO(guswynn): ideally we could push these updates
620                // at the same time, but because they are coming from separately
621                // workers, they could end up in different rounds, causing flakes.
622                // For now, we just do this.
623                Update(TestUpdate {
624                    worker_id: 1,
625                    namespace: StatusNamespace::Generator,
626                    id: Some(GlobalId::User(1)),
627                    update: HealthStatusUpdate::running(),
628                }),
629                AssertStatus(vec![StatusToAssert {
630                    collection_index: 1,
631                    status: Status::Running,
632                    ..Default::default()
633                }]),
634                Update(TestUpdate {
635                    worker_id: 0,
636                    namespace: StatusNamespace::Generator,
637                    id: Some(GlobalId::User(1)),
638                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
639                }),
640                AssertStatus(vec![StatusToAssert {
641                    collection_index: 1,
642                    status: Status::Stalled,
643                    error: Some("generator: uhoh".to_string()),
644                    errors: Some("generator: uhoh".to_string()),
645                    ..Default::default()
646                }]),
647                // And that it can recover.
648                Update(TestUpdate {
649                    worker_id: 0,
650                    namespace: StatusNamespace::Generator,
651                    id: Some(GlobalId::User(1)),
652                    update: HealthStatusUpdate::running(),
653                }),
654                AssertStatus(vec![StatusToAssert {
655                    collection_index: 1,
656                    status: Status::Running,
657                    ..Default::default()
658                }]),
659            ],
660        );
661    }
662
663    #[mz_ore::test]
664    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
665    fn test_health_operator_write_namespaced_map() {
666        use Step::*;
667
668        // Test 2 inputs across 2 workers.
669        health_operator_runner(
670            2,
671            2,
672            // testing this
673            false,
674            vec![
675                AssertStatus(vec![
676                    // Assert both inputs started.
677                    StatusToAssert {
678                        collection_index: 0,
679                        status: Status::Starting,
680                        ..Default::default()
681                    },
682                    StatusToAssert {
683                        collection_index: 1,
684                        status: Status::Starting,
685                        ..Default::default()
686                    },
687                ]),
688                Update(TestUpdate {
689                    worker_id: 0,
690                    namespace: StatusNamespace::Generator,
691                    id: Some(GlobalId::User(1)),
692                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
693                }),
694                AssertStatus(vec![StatusToAssert {
695                    collection_index: 1,
696                    status: Status::Stalled,
697                    error: Some("generator: uhoh".to_string()),
698                    errors: None,
699                    ..Default::default()
700                }]),
701            ],
702        )
703    }
704
705    #[mz_ore::test]
706    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
707    fn test_health_operator_namespaces() {
708        use Step::*;
709
710        // Test 2 inputs across 2 workers.
711        health_operator_runner(
712            2,
713            1,
714            true,
715            vec![
716                AssertStatus(vec![
717                    // Assert both inputs started.
718                    StatusToAssert {
719                        collection_index: 0,
720                        status: Status::Starting,
721                        ..Default::default()
722                    },
723                ]),
724                // Assert that we merge namespaced errors correctly.
725                //
726                // Note that these all happen on the same worker id.
727                Update(TestUpdate {
728                    worker_id: 0,
729                    namespace: StatusNamespace::Generator,
730                    id: None,
731                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
732                }),
733                AssertStatus(vec![StatusToAssert {
734                    collection_index: 0,
735                    status: Status::Stalled,
736                    error: Some("generator: uhoh".to_string()),
737                    errors: Some("generator: uhoh".to_string()),
738                    ..Default::default()
739                }]),
740                Update(TestUpdate {
741                    worker_id: 0,
742                    namespace: StatusNamespace::Kafka,
743                    id: None,
744                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
745                }),
746                AssertStatus(vec![StatusToAssert {
747                    collection_index: 0,
748                    status: Status::Stalled,
749                    error: Some("kafka: uhoh".to_string()),
750                    errors: Some("generator: uhoh, kafka: uhoh".to_string()),
751                    ..Default::default()
752                }]),
753                // And that it can recover.
754                Update(TestUpdate {
755                    worker_id: 0,
756                    namespace: StatusNamespace::Kafka,
757                    id: None,
758                    update: HealthStatusUpdate::running(),
759                }),
760                AssertStatus(vec![StatusToAssert {
761                    collection_index: 0,
762                    status: Status::Stalled,
763                    error: Some("generator: uhoh".to_string()),
764                    errors: Some("generator: uhoh".to_string()),
765                    ..Default::default()
766                }]),
767                Update(TestUpdate {
768                    worker_id: 0,
769                    namespace: StatusNamespace::Generator,
770                    id: None,
771                    update: HealthStatusUpdate::running(),
772                }),
773                AssertStatus(vec![StatusToAssert {
774                    collection_index: 0,
775                    status: Status::Running,
776                    ..Default::default()
777                }]),
778            ],
779        );
780    }
781
782    #[mz_ore::test]
783    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
784    fn test_health_operator_namespace_side_channel() {
785        use Step::*;
786
787        health_operator_runner(
788            2,
789            1,
790            true,
791            vec![
792                AssertStatus(vec![
793                    // Assert both inputs started.
794                    StatusToAssert {
795                        collection_index: 0,
796                        status: Status::Starting,
797                        ..Default::default()
798                    },
799                ]),
800                // Assert that sidechannel namespaces don't downgrade the status
801                //
802                // Note that these all happen on the same worker id.
803                Update(TestUpdate {
804                    worker_id: 0,
805                    namespace: StatusNamespace::Ssh,
806                    id: None,
807                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
808                }),
809                AssertStatus(vec![StatusToAssert {
810                    collection_index: 0,
811                    status: Status::Stalled,
812                    error: Some("ssh: uhoh".to_string()),
813                    errors: Some("ssh: uhoh".to_string()),
814                    ..Default::default()
815                }]),
816                Update(TestUpdate {
817                    worker_id: 0,
818                    namespace: StatusNamespace::Ssh,
819                    id: None,
820                    update: HealthStatusUpdate::stalled("uhoh2".to_string(), None),
821                }),
822                AssertStatus(vec![StatusToAssert {
823                    collection_index: 0,
824                    status: Status::Stalled,
825                    error: Some("ssh: uhoh2".to_string()),
826                    errors: Some("ssh: uhoh2".to_string()),
827                    ..Default::default()
828                }]),
829                Update(TestUpdate {
830                    worker_id: 0,
831                    namespace: StatusNamespace::Ssh,
832                    id: None,
833                    update: HealthStatusUpdate::running(),
834                }),
835                // We haven't starting running yet, as a `Default` namespace hasn't told us.
836                AssertStatus(vec![StatusToAssert {
837                    collection_index: 0,
838                    status: Status::Starting,
839                    ..Default::default()
840                }]),
841                Update(TestUpdate {
842                    worker_id: 0,
843                    namespace: StatusNamespace::Generator,
844                    id: None,
845                    update: HealthStatusUpdate::running(),
846                }),
847                AssertStatus(vec![StatusToAssert {
848                    collection_index: 0,
849                    status: Status::Running,
850                    ..Default::default()
851                }]),
852            ],
853        );
854    }
855
856    #[mz_ore::test]
857    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
858    fn test_health_operator_hints() {
859        use Step::*;
860
861        health_operator_runner(
862            2,
863            1,
864            true,
865            vec![
866                AssertStatus(vec![
867                    // Assert both inputs started.
868                    StatusToAssert {
869                        collection_index: 0,
870                        status: Status::Starting,
871                        ..Default::default()
872                    },
873                ]),
874                // Note that these all happen across worker ids.
875                Update(TestUpdate {
876                    worker_id: 0,
877                    namespace: StatusNamespace::Generator,
878                    id: None,
879                    update: HealthStatusUpdate::stalled(
880                        "uhoh".to_string(),
881                        Some("hint1".to_string()),
882                    ),
883                }),
884                AssertStatus(vec![StatusToAssert {
885                    collection_index: 0,
886                    status: Status::Stalled,
887                    error: Some("generator: uhoh".to_string()),
888                    errors: Some("generator: uhoh".to_string()),
889                    hint: Some("hint1".to_string()),
890                }]),
891                Update(TestUpdate {
892                    worker_id: 1,
893                    namespace: StatusNamespace::Generator,
894                    id: None,
895                    update: HealthStatusUpdate::stalled(
896                        "uhoh2".to_string(),
897                        Some("hint2".to_string()),
898                    ),
899                }),
900                AssertStatus(vec![StatusToAssert {
901                    collection_index: 0,
902                    status: Status::Stalled,
903                    // Note the error sorts later so we just use that.
904                    error: Some("generator: uhoh2".to_string()),
905                    errors: Some("generator: uhoh2".to_string()),
906                    hint: Some("hint1, hint2".to_string()),
907                }]),
908                // Update one of the hints
909                Update(TestUpdate {
910                    worker_id: 1,
911                    namespace: StatusNamespace::Generator,
912                    id: None,
913                    update: HealthStatusUpdate::stalled(
914                        "uhoh2".to_string(),
915                        Some("hint3".to_string()),
916                    ),
917                }),
918                AssertStatus(vec![StatusToAssert {
919                    collection_index: 0,
920                    status: Status::Stalled,
921                    // Note the error sorts later so we just use that.
922                    error: Some("generator: uhoh2".to_string()),
923                    errors: Some("generator: uhoh2".to_string()),
924                    hint: Some("hint1, hint3".to_string()),
925                }]),
926                // Assert recovery.
927                Update(TestUpdate {
928                    worker_id: 0,
929                    namespace: StatusNamespace::Generator,
930                    id: None,
931                    update: HealthStatusUpdate::running(),
932                }),
933                AssertStatus(vec![StatusToAssert {
934                    collection_index: 0,
935                    status: Status::Stalled,
936                    // Note the error sorts later so we just use that.
937                    error: Some("generator: uhoh2".to_string()),
938                    errors: Some("generator: uhoh2".to_string()),
939                    hint: Some("hint3".to_string()),
940                }]),
941                Update(TestUpdate {
942                    worker_id: 1,
943                    namespace: StatusNamespace::Generator,
944                    id: None,
945                    update: HealthStatusUpdate::running(),
946                }),
947                AssertStatus(vec![StatusToAssert {
948                    collection_index: 0,
949                    status: Status::Running,
950                    ..Default::default()
951                }]),
952            ],
953        );
954    }
955
956    // The below is ALL test infrastructure for the above
957
958    use mz_ore::assert_err;
959    use timely::container::CapacityContainerBuilder;
960    use timely::dataflow::Scope;
961    use timely::dataflow::operators::Enter;
962    use timely::dataflow::operators::exchange::Exchange;
963    use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
964
965    /// A status to assert.
966    #[derive(Debug, Clone, PartialEq, Eq)]
967    struct StatusToAssert {
968        collection_index: usize,
969        status: Status,
970        error: Option<String>,
971        errors: Option<String>,
972        hint: Option<String>,
973    }
974
975    impl Default for StatusToAssert {
976        fn default() -> Self {
977            StatusToAssert {
978                collection_index: Default::default(),
979                status: Status::Running,
980                error: Default::default(),
981                errors: Default::default(),
982                hint: Default::default(),
983            }
984        }
985    }
986
987    /// An update to push into the operator.
988    /// Can come from any worker, and from any input.
989    #[derive(Debug, Clone)]
990    struct TestUpdate {
991        worker_id: u64,
992        namespace: StatusNamespace,
993        id: Option<GlobalId>,
994        update: HealthStatusUpdate,
995    }
996
997    #[derive(Debug, Clone)]
998    enum Step {
999        /// Insert a new health update.
1000        Update(TestUpdate),
1001        /// Assert a set of outputs. Note that these should
1002        /// have unique `collection_index`'s
1003        AssertStatus(Vec<StatusToAssert>),
1004    }
1005
1006    struct TestWriter {
1007        sender: UnboundedSender<StatusToAssert>,
1008        input_mapping: BTreeMap<GlobalId, usize>,
1009    }
1010
1011    impl HealthOperator for TestWriter {
1012        fn record_new_status(
1013            &self,
1014            collection_id: GlobalId,
1015            _ts: DateTime<Utc>,
1016            status: Status,
1017            new_error: Option<&str>,
1018            hints: &BTreeSet<String>,
1019            namespaced_errors: &BTreeMap<StatusNamespace, String>,
1020            write_namespaced_map: bool,
1021        ) {
1022            let _ = self.sender.send(StatusToAssert {
1023                collection_index: *self.input_mapping.get(&collection_id).unwrap(),
1024                status,
1025                error: new_error.map(str::to_string),
1026                errors: if !namespaced_errors.is_empty() && write_namespaced_map {
1027                    Some(
1028                        namespaced_errors
1029                            .iter()
1030                            .map(|(ns, err)| format!("{}: {}", ns, err))
1031                            .join(", "),
1032                    )
1033                } else {
1034                    None
1035                },
1036                hint: if !hints.is_empty() {
1037                    Some(hints.iter().join(", "))
1038                } else {
1039                    None
1040                },
1041            });
1042        }
1043
1044        fn send_halt(&self, _id: GlobalId, _error: Option<(StatusNamespace, HealthStatusUpdate)>) {
1045            // Not yet unit-tested
1046            unimplemented!()
1047        }
1048
1049        fn chosen_worker(&self) -> Option<usize> {
1050            // We input and assert outputs on the first worker.
1051            Some(0)
1052        }
1053    }
1054
1055    /// Setup a `health_operator` with a set number of workers and inputs, and the
1056    /// steps on the first worker.
1057    fn health_operator_runner(
1058        workers: usize,
1059        inputs: usize,
1060        write_namespaced_map: bool,
1061        steps: Vec<Step>,
1062    ) {
1063        let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
1064        let tokio_handle = tokio_runtime.handle().clone();
1065
1066        let inputs: BTreeMap<GlobalId, usize> = (0..inputs)
1067            .map(|index| (GlobalId::User(u64::cast_from(index)), index))
1068            .collect();
1069
1070        timely::execute::execute(
1071            timely::execute::Config {
1072                communication: timely::CommunicationConfig::Process(workers),
1073                worker: Default::default(),
1074            },
1075            move |worker| {
1076                let steps = steps.clone();
1077                let inputs = inputs.clone();
1078
1079                let _tokio_guard = tokio_handle.enter();
1080                let (in_tx, in_rx) = unbounded_channel();
1081                let (out_tx, mut out_rx) = unbounded_channel();
1082
1083                worker.dataflow::<(), _, _>(|root_scope| {
1084                    root_scope
1085                        .clone()
1086                        .scoped::<mz_repr::Timestamp, _, _>("gus", |scope| {
1087                            let input = producer(root_scope.clone(), in_rx).enter(scope);
1088                            Box::leak(Box::new(health_operator(
1089                                scope,
1090                                mz_ore::now::SYSTEM_TIME.clone(),
1091                                inputs.keys().copied().collect(),
1092                                *inputs.first_key_value().unwrap().0,
1093                                "source_test",
1094                                &input,
1095                                TestWriter {
1096                                    sender: out_tx,
1097                                    input_mapping: inputs,
1098                                },
1099                                write_namespaced_map,
1100                                Duration::from_secs(5),
1101                            )));
1102                        });
1103                });
1104
1105                // We arbitrarily do all the testing on the first worker.
1106                if worker.index() == 0 {
1107                    use Step::*;
1108                    for step in steps {
1109                        match step {
1110                            Update(update) => {
1111                                let _ = in_tx.send(update);
1112                            }
1113                            AssertStatus(mut statuses) => loop {
1114                                match out_rx.try_recv() {
1115                                    Err(_) => {
1116                                        worker.step();
1117                                        // This makes testing easier.
1118                                        std::thread::sleep(std::time::Duration::from_millis(50));
1119                                    }
1120                                    Ok(update) => {
1121                                        let pos = statuses
1122                                            .iter()
1123                                            .position(|s| {
1124                                                s.collection_index == update.collection_index
1125                                            })
1126                                            .unwrap();
1127
1128                                        let status_to_assert = &statuses[pos];
1129                                        assert_eq!(&update, status_to_assert);
1130
1131                                        statuses.remove(pos);
1132                                        if statuses.is_empty() {
1133                                            break;
1134                                        }
1135                                    }
1136                                }
1137                            },
1138                        }
1139                    }
1140
1141                    // Assert that nothing is left in the channel.
1142                    assert_err!(out_rx.try_recv());
1143                }
1144            },
1145        )
1146        .unwrap();
1147    }
1148
1149    /// Produces (input_index, HealthStatusUpdate)'s based on the input channel.
1150    ///
1151    /// Only the first worker is used, all others immediately drop their capabilities and channels.
1152    /// After the channel is empty on the first worker, then the frontier will go to [].
1153    /// Also ensures that updates are routed to the correct worker based on the `TestUpdate`
1154    /// using an exchange.
1155    fn producer<G: Scope<Timestamp = ()>>(
1156        scope: G,
1157        mut input: UnboundedReceiver<TestUpdate>,
1158    ) -> Stream<G, HealthStatusMessage> {
1159        let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope.clone());
1160        let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<_>>>();
1161
1162        let index = scope.index();
1163        iterator.build(|mut caps| async move {
1164            // We input and assert outputs on the first worker.
1165            if index != 0 {
1166                return;
1167            }
1168            let mut capability = Some(caps.pop().unwrap());
1169            while let Some(element) = input.recv().await {
1170                output_handle.give(
1171                    capability.as_ref().unwrap(),
1172                    (
1173                        element.worker_id,
1174                        element.id,
1175                        element.namespace,
1176                        element.update,
1177                    ),
1178                );
1179            }
1180
1181            capability.take();
1182        });
1183
1184        let output = output.exchange(|d| d.0).map(|d| HealthStatusMessage {
1185            id: d.1,
1186            namespace: d.2,
1187            update: d.3,
1188        });
1189
1190        output
1191    }
1192}