mz_storage/
healthcheck.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Healthcheck common
11
12use std::cell::RefCell;
13use std::collections::btree_map::Entry;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::fmt::Debug;
17use std::rc::Rc;
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use differential_dataflow::Hashable;
22use futures::StreamExt;
23use mz_ore::cast::CastFrom;
24use mz_ore::now::NowFn;
25use mz_repr::GlobalId;
26use mz_storage_client::client::{Status, StatusUpdate};
27use mz_timely_util::builder_async::{
28    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
29};
30use timely::dataflow::channels::pact::Exchange;
31use timely::dataflow::operators::{Enter, Map};
32use timely::dataflow::scopes::Child;
33use timely::dataflow::{Scope, Stream};
34use tracing::{error, info};
35
36use crate::internal_control::{InternalCommandSender, InternalStorageCommand};
37
38/// The namespace of the update. The `Ord` impl matter here, later variants are
39/// displayed over earlier ones.
40///
41/// Some namespaces (referred to as "sidechannels") can come from any worker_id,
42/// and `Running` statuses from them do not mark the entire object as running.
43///
44/// Ensure you update `is_sidechannel` when adding variants.
45#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
46pub enum StatusNamespace {
47    /// A normal status namespaces. Any `Running` status from any worker will mark the object
48    /// `Running`.
49    Generator,
50    Kafka,
51    Postgres,
52    MySql,
53    SqlServer,
54    Ssh,
55    Upsert,
56    Decode,
57    Internal,
58}
59
60impl StatusNamespace {
61    fn is_sidechannel(&self) -> bool {
62        matches!(self, StatusNamespace::Ssh)
63    }
64}
65
66impl fmt::Display for StatusNamespace {
67    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68        use StatusNamespace::*;
69        match self {
70            Generator => write!(f, "generator"),
71            Kafka => write!(f, "kafka"),
72            Postgres => write!(f, "postgres"),
73            MySql => write!(f, "mysql"),
74            SqlServer => write!(f, "sql-server"),
75            Ssh => write!(f, "ssh"),
76            Upsert => write!(f, "upsert"),
77            Decode => write!(f, "decode"),
78            Internal => write!(f, "internal"),
79        }
80    }
81}
82
83#[derive(Debug)]
84struct PerWorkerHealthStatus {
85    pub(crate) errors_by_worker: Vec<BTreeMap<StatusNamespace, HealthStatusUpdate>>,
86}
87
88impl PerWorkerHealthStatus {
89    fn merge_update(
90        &mut self,
91        worker: usize,
92        namespace: StatusNamespace,
93        update: HealthStatusUpdate,
94        only_greater: bool,
95    ) {
96        let errors = &mut self.errors_by_worker[worker];
97        match errors.entry(namespace) {
98            Entry::Vacant(v) => {
99                v.insert(update);
100            }
101            Entry::Occupied(mut o) => {
102                if !only_greater || o.get() < &update {
103                    o.insert(update);
104                }
105            }
106        }
107    }
108
109    fn decide_status(&self) -> OverallStatus {
110        let mut output_status = OverallStatus::Starting;
111        let mut namespaced_errors: BTreeMap<StatusNamespace, String> = BTreeMap::new();
112        let mut hints: BTreeSet<String> = BTreeSet::new();
113
114        for status in self.errors_by_worker.iter() {
115            for (ns, ns_status) in status.iter() {
116                match ns_status {
117                    // HealthStatusUpdate::Ceased is currently unused, so just
118                    // treat it as if it were a normal error.
119                    //
120                    // TODO: redesign ceased status database-issues#7687
121                    HealthStatusUpdate::Ceased { error } => {
122                        if Some(error) > namespaced_errors.get(ns).as_deref() {
123                            namespaced_errors.insert(*ns, error.to_string());
124                        }
125                    }
126                    HealthStatusUpdate::Stalled { error, hint, .. } => {
127                        if Some(error) > namespaced_errors.get(ns).as_deref() {
128                            namespaced_errors.insert(*ns, error.to_string());
129                        }
130
131                        if let Some(hint) = hint {
132                            hints.insert(hint.to_string());
133                        }
134                    }
135                    HealthStatusUpdate::Running => {
136                        if !ns.is_sidechannel() {
137                            output_status = OverallStatus::Running;
138                        }
139                    }
140                }
141            }
142        }
143
144        if !namespaced_errors.is_empty() {
145            // Pick the most important error.
146            let (ns, err) = namespaced_errors.last_key_value().unwrap();
147            output_status = OverallStatus::Stalled {
148                error: format!("{}: {}", ns, err),
149                hints,
150                namespaced_errors,
151            }
152        }
153
154        output_status
155    }
156}
157
158#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
159pub enum OverallStatus {
160    Starting,
161    Running,
162    Stalled {
163        error: String,
164        hints: BTreeSet<String>,
165        namespaced_errors: BTreeMap<StatusNamespace, String>,
166    },
167    Ceased {
168        error: String,
169    },
170}
171
172impl OverallStatus {
173    /// The user-readable error string, if there is one.
174    pub(crate) fn error(&self) -> Option<&str> {
175        match self {
176            OverallStatus::Starting | OverallStatus::Running => None,
177            OverallStatus::Stalled { error, .. } | OverallStatus::Ceased { error, .. } => {
178                Some(error)
179            }
180        }
181    }
182
183    /// A set of namespaced errors, if there are any.
184    pub(crate) fn errors(&self) -> Option<&BTreeMap<StatusNamespace, String>> {
185        match self {
186            OverallStatus::Starting | OverallStatus::Running | OverallStatus::Ceased { .. } => None,
187            OverallStatus::Stalled {
188                namespaced_errors, ..
189            } => Some(namespaced_errors),
190        }
191    }
192
193    /// A set of hints, if there are any.
194    pub(crate) fn hints(&self) -> BTreeSet<String> {
195        match self {
196            OverallStatus::Starting | OverallStatus::Running | OverallStatus::Ceased { .. } => {
197                BTreeSet::new()
198            }
199            OverallStatus::Stalled { hints, .. } => hints.clone(),
200        }
201    }
202}
203
204impl<'a> From<&'a OverallStatus> for Status {
205    fn from(val: &'a OverallStatus) -> Self {
206        match val {
207            OverallStatus::Starting => Status::Starting,
208            OverallStatus::Running => Status::Running,
209            OverallStatus::Stalled { .. } => Status::Stalled,
210            OverallStatus::Ceased { .. } => Status::Ceased,
211        }
212    }
213}
214
215#[derive(Debug)]
216struct HealthState {
217    healths: PerWorkerHealthStatus,
218    last_reported_status: Option<OverallStatus>,
219    halt_with: Option<(StatusNamespace, HealthStatusUpdate)>,
220}
221
222impl HealthState {
223    fn new(worker_count: usize) -> HealthState {
224        HealthState {
225            healths: PerWorkerHealthStatus {
226                errors_by_worker: vec![Default::default(); worker_count],
227            },
228            last_reported_status: None,
229            halt_with: None,
230        }
231    }
232}
233
234/// A trait that lets a user configure the `health_operator` with custom
235/// behavior. This is mostly useful for testing, and the [`DefaultWriter`]
236/// should be the correct implementation for everyone.
237pub trait HealthOperator {
238    /// Record a new status.
239    fn record_new_status(
240        &self,
241        collection_id: GlobalId,
242        ts: DateTime<Utc>,
243        new_status: Status,
244        new_error: Option<&str>,
245        hints: &BTreeSet<String>,
246        namespaced_errors: &BTreeMap<StatusNamespace, String>,
247        // TODO(guswynn): not urgent:
248        // Ideally this would be entirely included in the `DefaultWriter`, but that
249        // requires a fairly heavy change to the `health_operator`, which hardcodes
250        // some use of persist. For now we just leave it and ignore it in tests.
251        write_namespaced_map: bool,
252    );
253    fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>);
254
255    /// Optionally override the chosen worker index. Default is semi-random.
256    /// Only useful for tests.
257    fn chosen_worker(&self) -> Option<usize> {
258        None
259    }
260}
261
262/// A default `HealthOperator` for use in normal cases.
263pub struct DefaultWriter {
264    pub command_tx: InternalCommandSender,
265    pub updates: Rc<RefCell<Vec<StatusUpdate>>>,
266}
267
268impl HealthOperator for DefaultWriter {
269    fn record_new_status(
270        &self,
271        collection_id: GlobalId,
272        ts: DateTime<Utc>,
273        status: Status,
274        new_error: Option<&str>,
275        hints: &BTreeSet<String>,
276        namespaced_errors: &BTreeMap<StatusNamespace, String>,
277        write_namespaced_map: bool,
278    ) {
279        self.updates.borrow_mut().push(StatusUpdate {
280            id: collection_id,
281            timestamp: ts,
282            status,
283            error: new_error.map(|e| e.to_string()),
284            hints: hints.clone(),
285            namespaced_errors: if write_namespaced_map {
286                namespaced_errors
287                    .iter()
288                    .map(|(ns, val)| (ns.to_string(), val.clone()))
289                    .collect()
290            } else {
291                BTreeMap::new()
292            },
293            replica_id: None,
294        });
295    }
296
297    fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>) {
298        self.command_tx
299            .send(InternalStorageCommand::SuspendAndRestart {
300                // Suspend and restart is expected to operate on the primary object and
301                // not any of the sub-objects
302                id,
303                reason: format!("{:?}", error),
304            });
305    }
306}
307
308/// A health message consumed by the `health_operator`.
309#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
310pub struct HealthStatusMessage {
311    /// The object that this status message is about. When None, it refers to the entire ingestion
312    /// as a whole. When Some, it refers to a specific subsource.
313    pub id: Option<GlobalId>,
314    /// The namespace of the health update.
315    pub namespace: StatusNamespace,
316    /// The update itself.
317    pub update: HealthStatusUpdate,
318}
319
320/// Writes updates that come across `health_stream` to the collection's status shards, as identified
321/// by their `CollectionMetadata`.
322///
323/// Only one worker will be active and write to the status shard.
324///
325/// The `OutputIndex` values that come across `health_stream` must be a strict subset of those in
326/// `configs`'s keys.
327pub(crate) fn health_operator<'g, G, P>(
328    scope: &Child<'g, G, mz_repr::Timestamp>,
329    now: NowFn,
330    // A set of id's that should be marked as `HealthStatusUpdate::starting()` during startup.
331    mark_starting: BTreeSet<GlobalId>,
332    // An id that is allowed to halt the dataflow. Others are ignored, and panic during debug
333    // mode.
334    halting_id: GlobalId,
335    // A description of the object type we are writing status updates about. Used in log lines.
336    object_type: &'static str,
337    // An indexed stream of health updates. Indexes are configured in `configs`.
338    health_stream: &Stream<G, HealthStatusMessage>,
339    // An impl of `HealthOperator` that configures the output behavior of this operator.
340    health_operator_impl: P,
341    // Whether or not we should actually write namespaced errors in the `details` column.
342    write_namespaced_map: bool,
343    // How long to wait before initiating a `SuspendAndRestart` command, to
344    // prevent hot restart loops.
345    suspend_and_restart_delay: Duration,
346) -> PressOnDropButton
347where
348    G: Scope<Timestamp = ()>,
349    P: HealthOperator + 'static,
350{
351    // Derived config options
352    let healthcheck_worker_id = scope.index();
353    let worker_count = scope.peers();
354
355    // Inject the originating worker id to each item before exchanging to the chosen worker
356    let health_stream = health_stream.map(move |status| (healthcheck_worker_id, status));
357
358    let chosen_worker_id = if let Some(index) = health_operator_impl.chosen_worker() {
359        index
360    } else {
361        // We'll route all the work to a single arbitrary worker;
362        // there's not much to do, and we need a global view.
363        usize::cast_from(mark_starting.iter().next().hashed()) % worker_count
364    };
365
366    let is_active_worker = chosen_worker_id == healthcheck_worker_id;
367
368    let operator_name = format!("healthcheck({})", healthcheck_worker_id);
369    let mut health_op = AsyncOperatorBuilder::new(operator_name, scope.clone());
370
371    let health = health_stream.enter(scope);
372
373    let mut input = health_op.new_disconnected_input(
374        &health,
375        Exchange::new(move |_| u64::cast_from(chosen_worker_id)),
376    );
377
378    let button = health_op.build(move |mut _capabilities| async move {
379        let mut health_states: BTreeMap<_, _> = mark_starting
380            .iter()
381            .copied()
382            .chain([halting_id])
383            .map(|id| (id, HealthState::new(worker_count)))
384            .collect();
385
386        // Write the initial starting state to the status shard for all managed objects
387        if is_active_worker {
388            for (id, state) in health_states.iter_mut() {
389                if mark_starting.contains(id) {
390                    let status = OverallStatus::Starting;
391                    let timestamp = mz_ore::now::to_datetime(now());
392                    health_operator_impl.record_new_status(
393                        *id,
394                        timestamp,
395                        (&status).into(),
396                        status.error(),
397                        &status.hints(),
398                        status.errors().unwrap_or(&BTreeMap::new()),
399                        write_namespaced_map,
400                    );
401
402                    state.last_reported_status = Some(status);
403                }
404            }
405        }
406
407        let mut outputs_seen = BTreeMap::<GlobalId, BTreeSet<_>>::new();
408        while let Some(event) = input.next().await {
409            if let AsyncEvent::Data(_cap, rows) = event {
410                for (worker_id, message) in rows {
411                    let HealthStatusMessage {
412                        id,
413                        namespace: ns,
414                        update: health_event,
415                    } = message;
416                    let id = id.unwrap_or(halting_id);
417                    let HealthState {
418                        healths, halt_with, ..
419                    } = match health_states.get_mut(&id) {
420                        Some(health) => health,
421                        // This is a health status update for a sub-object_type that we did not request to
422                        // be generated, which means it doesn't have a GlobalId and should not be
423                        // propagated to the shard.
424                        None => continue,
425                    };
426
427                    // Its important to track `new_round` per-namespace, so namespaces are reasoned
428                    // about in `merge_update` independently.
429                    let new_round = outputs_seen
430                        .entry(id)
431                        .or_insert_with(BTreeSet::new)
432                        .insert(ns.clone());
433
434                    if !is_active_worker {
435                        error!(
436                            "Health messages for {object_type} {id} passed to \
437                              an unexpected worker id: {healthcheck_worker_id}"
438                        )
439                    }
440
441                    if health_event.should_halt() {
442                        *halt_with = Some((ns.clone(), health_event.clone()));
443                    }
444
445                    healths.merge_update(worker_id, ns, health_event, !new_round);
446                }
447
448                let mut halt_with_outer = None;
449
450                while let Some((id, _)) = outputs_seen.pop_first() {
451                    let HealthState {
452                        healths,
453                        last_reported_status,
454                        halt_with,
455                    } = health_states.get_mut(&id).expect("known to exist");
456
457                    let new_status = healths.decide_status();
458
459                    if Some(&new_status) != last_reported_status.as_ref() {
460                        info!(
461                            "Health transition for {object_type} {id}: \
462                                  {last_reported_status:?} -> {:?}",
463                            Some(&new_status)
464                        );
465
466                        let timestamp = mz_ore::now::to_datetime(now());
467                        health_operator_impl.record_new_status(
468                            id,
469                            timestamp,
470                            (&new_status).into(),
471                            new_status.error(),
472                            &new_status.hints(),
473                            new_status.errors().unwrap_or(&BTreeMap::new()),
474                            write_namespaced_map,
475                        );
476
477                        *last_reported_status = Some(new_status.clone());
478                    }
479
480                    // Set halt with if None.
481                    if halt_with_outer.is_none() && halt_with.is_some() {
482                        halt_with_outer = Some((id, halt_with.clone()));
483                    }
484                }
485
486                // TODO(aljoscha): Instead of threading through the
487                // `should_halt` bit, we can give an internal command sender
488                // directly to the places where `should_halt = true` originates.
489                // We should definitely do that, but this is okay for a PoC.
490                if let Some((id, halt_with)) = halt_with_outer {
491                    mz_ore::soft_assert_or_log!(
492                        id == halting_id,
493                        "sub{object_type}s should not produce \
494                        halting errors, however {:?} halted while primary \
495                                            {object_type} is {:?}",
496                        id,
497                        halting_id
498                    );
499
500                    info!(
501                        "Broadcasting suspend-and-restart \
502                        command because of {:?} after {:?} delay",
503                        halt_with, suspend_and_restart_delay
504                    );
505                    tokio::time::sleep(suspend_and_restart_delay).await;
506                    health_operator_impl.send_halt(id, halt_with);
507                }
508            }
509        }
510    });
511
512    button.press_on_drop()
513}
514
515use serde::{Deserialize, Serialize};
516
517/// NB: we derive Ord here, so the enum order matters. Generally, statuses later in the list
518/// take precedence over earlier ones: so if one worker is stalled, we'll consider the entire
519/// source to be stalled.
520#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
521pub enum HealthStatusUpdate {
522    Running,
523    Stalled {
524        error: String,
525        hint: Option<String>,
526        should_halt: bool,
527    },
528    Ceased {
529        error: String,
530    },
531}
532
533impl HealthStatusUpdate {
534    /// Generates a running [`HealthStatusUpdate`].
535    pub(crate) fn running() -> Self {
536        HealthStatusUpdate::Running
537    }
538
539    /// Generates a non-halting [`HealthStatusUpdate`] with `update`.
540    pub(crate) fn stalled(error: String, hint: Option<String>) -> Self {
541        HealthStatusUpdate::Stalled {
542            error,
543            hint,
544            should_halt: false,
545        }
546    }
547
548    /// Generates a halting [`HealthStatusUpdate`] with `update`.
549    pub(crate) fn halting(error: String, hint: Option<String>) -> Self {
550        HealthStatusUpdate::Stalled {
551            error,
552            hint,
553            should_halt: true,
554        }
555    }
556
557    // TODO: redesign ceased status database-issues#7687
558    // Generates a ceasing [`HealthStatusUpdate`] with `update`.
559    // pub(crate) fn ceasing(error: String) -> Self {
560    //     HealthStatusUpdate::Ceased { error }
561    // }
562
563    /// Whether or not we should halt the dataflow instances and restart it.
564    pub(crate) fn should_halt(&self) -> bool {
565        match self {
566            HealthStatusUpdate::Running |
567            // HealthStatusUpdate::Ceased should never halt because it can occur
568            // at the subsource level and should not cause the entire dataflow
569            // to halt. Instead, the dataflow itself should handle shutting
570            // itself down if need be.
571            HealthStatusUpdate::Ceased { .. } => false,
572            HealthStatusUpdate::Stalled { should_halt, .. } => *should_halt,
573        }
574    }
575}
576
577#[cfg(test)]
578mod tests {
579    use super::*;
580    use itertools::Itertools;
581
582    // Actual timely tests for `health_operator`.
583
584    #[mz_ore::test]
585    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
586    fn test_health_operator_basic() {
587        use Step::*;
588
589        // Test 2 inputs across 2 workers.
590        health_operator_runner(
591            2,
592            2,
593            true,
594            vec![
595                AssertStatus(vec![
596                    // Assert both inputs started.
597                    StatusToAssert {
598                        collection_index: 0,
599                        status: Status::Starting,
600                        ..Default::default()
601                    },
602                    StatusToAssert {
603                        collection_index: 1,
604                        status: Status::Starting,
605                        ..Default::default()
606                    },
607                ]),
608                // Update and assert one is running.
609                Update(TestUpdate {
610                    worker_id: 1,
611                    namespace: StatusNamespace::Generator,
612                    id: None,
613                    update: HealthStatusUpdate::running(),
614                }),
615                AssertStatus(vec![StatusToAssert {
616                    collection_index: 0,
617                    status: Status::Running,
618                    ..Default::default()
619                }]),
620                // Assert the other can be stalled by 1 worker.
621                //
622                // TODO(guswynn): ideally we could push these updates
623                // at the same time, but because they are coming from separately
624                // workers, they could end up in different rounds, causing flakes.
625                // For now, we just do this.
626                Update(TestUpdate {
627                    worker_id: 1,
628                    namespace: StatusNamespace::Generator,
629                    id: Some(GlobalId::User(1)),
630                    update: HealthStatusUpdate::running(),
631                }),
632                AssertStatus(vec![StatusToAssert {
633                    collection_index: 1,
634                    status: Status::Running,
635                    ..Default::default()
636                }]),
637                Update(TestUpdate {
638                    worker_id: 0,
639                    namespace: StatusNamespace::Generator,
640                    id: Some(GlobalId::User(1)),
641                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
642                }),
643                AssertStatus(vec![StatusToAssert {
644                    collection_index: 1,
645                    status: Status::Stalled,
646                    error: Some("generator: uhoh".to_string()),
647                    errors: Some("generator: uhoh".to_string()),
648                    ..Default::default()
649                }]),
650                // And that it can recover.
651                Update(TestUpdate {
652                    worker_id: 0,
653                    namespace: StatusNamespace::Generator,
654                    id: Some(GlobalId::User(1)),
655                    update: HealthStatusUpdate::running(),
656                }),
657                AssertStatus(vec![StatusToAssert {
658                    collection_index: 1,
659                    status: Status::Running,
660                    ..Default::default()
661                }]),
662            ],
663        );
664    }
665
666    #[mz_ore::test]
667    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
668    fn test_health_operator_write_namespaced_map() {
669        use Step::*;
670
671        // Test 2 inputs across 2 workers.
672        health_operator_runner(
673            2,
674            2,
675            // testing this
676            false,
677            vec![
678                AssertStatus(vec![
679                    // Assert both inputs started.
680                    StatusToAssert {
681                        collection_index: 0,
682                        status: Status::Starting,
683                        ..Default::default()
684                    },
685                    StatusToAssert {
686                        collection_index: 1,
687                        status: Status::Starting,
688                        ..Default::default()
689                    },
690                ]),
691                Update(TestUpdate {
692                    worker_id: 0,
693                    namespace: StatusNamespace::Generator,
694                    id: Some(GlobalId::User(1)),
695                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
696                }),
697                AssertStatus(vec![StatusToAssert {
698                    collection_index: 1,
699                    status: Status::Stalled,
700                    error: Some("generator: uhoh".to_string()),
701                    errors: None,
702                    ..Default::default()
703                }]),
704            ],
705        )
706    }
707
708    #[mz_ore::test]
709    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
710    fn test_health_operator_namespaces() {
711        use Step::*;
712
713        // Test 2 inputs across 2 workers.
714        health_operator_runner(
715            2,
716            1,
717            true,
718            vec![
719                AssertStatus(vec![
720                    // Assert both inputs started.
721                    StatusToAssert {
722                        collection_index: 0,
723                        status: Status::Starting,
724                        ..Default::default()
725                    },
726                ]),
727                // Assert that we merge namespaced errors correctly.
728                //
729                // Note that these all happen on the same worker id.
730                Update(TestUpdate {
731                    worker_id: 0,
732                    namespace: StatusNamespace::Generator,
733                    id: None,
734                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
735                }),
736                AssertStatus(vec![StatusToAssert {
737                    collection_index: 0,
738                    status: Status::Stalled,
739                    error: Some("generator: uhoh".to_string()),
740                    errors: Some("generator: uhoh".to_string()),
741                    ..Default::default()
742                }]),
743                Update(TestUpdate {
744                    worker_id: 0,
745                    namespace: StatusNamespace::Kafka,
746                    id: None,
747                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
748                }),
749                AssertStatus(vec![StatusToAssert {
750                    collection_index: 0,
751                    status: Status::Stalled,
752                    error: Some("kafka: uhoh".to_string()),
753                    errors: Some("generator: uhoh, kafka: uhoh".to_string()),
754                    ..Default::default()
755                }]),
756                // And that it can recover.
757                Update(TestUpdate {
758                    worker_id: 0,
759                    namespace: StatusNamespace::Kafka,
760                    id: None,
761                    update: HealthStatusUpdate::running(),
762                }),
763                AssertStatus(vec![StatusToAssert {
764                    collection_index: 0,
765                    status: Status::Stalled,
766                    error: Some("generator: uhoh".to_string()),
767                    errors: Some("generator: uhoh".to_string()),
768                    ..Default::default()
769                }]),
770                Update(TestUpdate {
771                    worker_id: 0,
772                    namespace: StatusNamespace::Generator,
773                    id: None,
774                    update: HealthStatusUpdate::running(),
775                }),
776                AssertStatus(vec![StatusToAssert {
777                    collection_index: 0,
778                    status: Status::Running,
779                    ..Default::default()
780                }]),
781            ],
782        );
783    }
784
785    #[mz_ore::test]
786    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
787    fn test_health_operator_namespace_side_channel() {
788        use Step::*;
789
790        health_operator_runner(
791            2,
792            1,
793            true,
794            vec![
795                AssertStatus(vec![
796                    // Assert both inputs started.
797                    StatusToAssert {
798                        collection_index: 0,
799                        status: Status::Starting,
800                        ..Default::default()
801                    },
802                ]),
803                // Assert that sidechannel namespaces don't downgrade the status
804                //
805                // Note that these all happen on the same worker id.
806                Update(TestUpdate {
807                    worker_id: 0,
808                    namespace: StatusNamespace::Ssh,
809                    id: None,
810                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
811                }),
812                AssertStatus(vec![StatusToAssert {
813                    collection_index: 0,
814                    status: Status::Stalled,
815                    error: Some("ssh: uhoh".to_string()),
816                    errors: Some("ssh: uhoh".to_string()),
817                    ..Default::default()
818                }]),
819                Update(TestUpdate {
820                    worker_id: 0,
821                    namespace: StatusNamespace::Ssh,
822                    id: None,
823                    update: HealthStatusUpdate::stalled("uhoh2".to_string(), None),
824                }),
825                AssertStatus(vec![StatusToAssert {
826                    collection_index: 0,
827                    status: Status::Stalled,
828                    error: Some("ssh: uhoh2".to_string()),
829                    errors: Some("ssh: uhoh2".to_string()),
830                    ..Default::default()
831                }]),
832                Update(TestUpdate {
833                    worker_id: 0,
834                    namespace: StatusNamespace::Ssh,
835                    id: None,
836                    update: HealthStatusUpdate::running(),
837                }),
838                // We haven't starting running yet, as a `Default` namespace hasn't told us.
839                AssertStatus(vec![StatusToAssert {
840                    collection_index: 0,
841                    status: Status::Starting,
842                    ..Default::default()
843                }]),
844                Update(TestUpdate {
845                    worker_id: 0,
846                    namespace: StatusNamespace::Generator,
847                    id: None,
848                    update: HealthStatusUpdate::running(),
849                }),
850                AssertStatus(vec![StatusToAssert {
851                    collection_index: 0,
852                    status: Status::Running,
853                    ..Default::default()
854                }]),
855            ],
856        );
857    }
858
859    #[mz_ore::test]
860    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
861    fn test_health_operator_hints() {
862        use Step::*;
863
864        health_operator_runner(
865            2,
866            1,
867            true,
868            vec![
869                AssertStatus(vec![
870                    // Assert both inputs started.
871                    StatusToAssert {
872                        collection_index: 0,
873                        status: Status::Starting,
874                        ..Default::default()
875                    },
876                ]),
877                // Note that these all happen across worker ids.
878                Update(TestUpdate {
879                    worker_id: 0,
880                    namespace: StatusNamespace::Generator,
881                    id: None,
882                    update: HealthStatusUpdate::stalled(
883                        "uhoh".to_string(),
884                        Some("hint1".to_string()),
885                    ),
886                }),
887                AssertStatus(vec![StatusToAssert {
888                    collection_index: 0,
889                    status: Status::Stalled,
890                    error: Some("generator: uhoh".to_string()),
891                    errors: Some("generator: uhoh".to_string()),
892                    hint: Some("hint1".to_string()),
893                }]),
894                Update(TestUpdate {
895                    worker_id: 1,
896                    namespace: StatusNamespace::Generator,
897                    id: None,
898                    update: HealthStatusUpdate::stalled(
899                        "uhoh2".to_string(),
900                        Some("hint2".to_string()),
901                    ),
902                }),
903                AssertStatus(vec![StatusToAssert {
904                    collection_index: 0,
905                    status: Status::Stalled,
906                    // Note the error sorts later so we just use that.
907                    error: Some("generator: uhoh2".to_string()),
908                    errors: Some("generator: uhoh2".to_string()),
909                    hint: Some("hint1, hint2".to_string()),
910                }]),
911                // Update one of the hints
912                Update(TestUpdate {
913                    worker_id: 1,
914                    namespace: StatusNamespace::Generator,
915                    id: None,
916                    update: HealthStatusUpdate::stalled(
917                        "uhoh2".to_string(),
918                        Some("hint3".to_string()),
919                    ),
920                }),
921                AssertStatus(vec![StatusToAssert {
922                    collection_index: 0,
923                    status: Status::Stalled,
924                    // Note the error sorts later so we just use that.
925                    error: Some("generator: uhoh2".to_string()),
926                    errors: Some("generator: uhoh2".to_string()),
927                    hint: Some("hint1, hint3".to_string()),
928                }]),
929                // Assert recovery.
930                Update(TestUpdate {
931                    worker_id: 0,
932                    namespace: StatusNamespace::Generator,
933                    id: None,
934                    update: HealthStatusUpdate::running(),
935                }),
936                AssertStatus(vec![StatusToAssert {
937                    collection_index: 0,
938                    status: Status::Stalled,
939                    // Note the error sorts later so we just use that.
940                    error: Some("generator: uhoh2".to_string()),
941                    errors: Some("generator: uhoh2".to_string()),
942                    hint: Some("hint3".to_string()),
943                }]),
944                Update(TestUpdate {
945                    worker_id: 1,
946                    namespace: StatusNamespace::Generator,
947                    id: None,
948                    update: HealthStatusUpdate::running(),
949                }),
950                AssertStatus(vec![StatusToAssert {
951                    collection_index: 0,
952                    status: Status::Running,
953                    ..Default::default()
954                }]),
955            ],
956        );
957    }
958
959    // The below is ALL test infrastructure for the above
960
961    use mz_ore::assert_err;
962    use timely::container::CapacityContainerBuilder;
963    use timely::dataflow::Scope;
964    use timely::dataflow::operators::exchange::Exchange;
965    use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
966
967    /// A status to assert.
968    #[derive(Debug, Clone, PartialEq, Eq)]
969    struct StatusToAssert {
970        collection_index: usize,
971        status: Status,
972        error: Option<String>,
973        errors: Option<String>,
974        hint: Option<String>,
975    }
976
977    impl Default for StatusToAssert {
978        fn default() -> Self {
979            StatusToAssert {
980                collection_index: Default::default(),
981                status: Status::Running,
982                error: Default::default(),
983                errors: Default::default(),
984                hint: Default::default(),
985            }
986        }
987    }
988
989    /// An update to push into the operator.
990    /// Can come from any worker, and from any input.
991    #[derive(Debug, Clone)]
992    struct TestUpdate {
993        worker_id: u64,
994        namespace: StatusNamespace,
995        id: Option<GlobalId>,
996        update: HealthStatusUpdate,
997    }
998
999    #[derive(Debug, Clone)]
1000    enum Step {
1001        /// Insert a new health update.
1002        Update(TestUpdate),
1003        /// Assert a set of outputs. Note that these should
1004        /// have unique `collection_index`'s
1005        AssertStatus(Vec<StatusToAssert>),
1006    }
1007
1008    struct TestWriter {
1009        sender: UnboundedSender<StatusToAssert>,
1010        input_mapping: BTreeMap<GlobalId, usize>,
1011    }
1012
1013    impl HealthOperator for TestWriter {
1014        fn record_new_status(
1015            &self,
1016            collection_id: GlobalId,
1017            _ts: DateTime<Utc>,
1018            status: Status,
1019            new_error: Option<&str>,
1020            hints: &BTreeSet<String>,
1021            namespaced_errors: &BTreeMap<StatusNamespace, String>,
1022            write_namespaced_map: bool,
1023        ) {
1024            let _ = self.sender.send(StatusToAssert {
1025                collection_index: *self.input_mapping.get(&collection_id).unwrap(),
1026                status,
1027                error: new_error.map(str::to_string),
1028                errors: if !namespaced_errors.is_empty() && write_namespaced_map {
1029                    Some(
1030                        namespaced_errors
1031                            .iter()
1032                            .map(|(ns, err)| format!("{}: {}", ns, err))
1033                            .join(", "),
1034                    )
1035                } else {
1036                    None
1037                },
1038                hint: if !hints.is_empty() {
1039                    Some(hints.iter().join(", "))
1040                } else {
1041                    None
1042                },
1043            });
1044        }
1045
1046        fn send_halt(&self, _id: GlobalId, _error: Option<(StatusNamespace, HealthStatusUpdate)>) {
1047            // Not yet unit-tested
1048            unimplemented!()
1049        }
1050
1051        fn chosen_worker(&self) -> Option<usize> {
1052            // We input and assert outputs on the first worker.
1053            Some(0)
1054        }
1055    }
1056
1057    /// Setup a `health_operator` with a set number of workers and inputs, and the
1058    /// steps on the first worker.
1059    fn health_operator_runner(
1060        workers: usize,
1061        inputs: usize,
1062        write_namespaced_map: bool,
1063        steps: Vec<Step>,
1064    ) {
1065        let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
1066        let tokio_handle = tokio_runtime.handle().clone();
1067
1068        let inputs: BTreeMap<GlobalId, usize> = (0..inputs)
1069            .map(|index| (GlobalId::User(u64::cast_from(index)), index))
1070            .collect();
1071
1072        timely::execute::execute(
1073            timely::execute::Config {
1074                communication: timely::CommunicationConfig::Process(workers),
1075                worker: Default::default(),
1076            },
1077            move |worker| {
1078                let steps = steps.clone();
1079                let inputs = inputs.clone();
1080
1081                let _tokio_guard = tokio_handle.enter();
1082                let (in_tx, in_rx) = unbounded_channel();
1083                let (out_tx, mut out_rx) = unbounded_channel();
1084
1085                worker.dataflow::<(), _, _>(|root_scope| {
1086                    root_scope
1087                        .clone()
1088                        .scoped::<mz_repr::Timestamp, _, _>("gus", |scope| {
1089                            let input = producer(root_scope.clone(), in_rx);
1090                            Box::leak(Box::new(health_operator(
1091                                scope,
1092                                mz_ore::now::SYSTEM_TIME.clone(),
1093                                inputs.keys().copied().collect(),
1094                                *inputs.first_key_value().unwrap().0,
1095                                "source_test",
1096                                &input,
1097                                TestWriter {
1098                                    sender: out_tx,
1099                                    input_mapping: inputs,
1100                                },
1101                                write_namespaced_map,
1102                                Duration::from_secs(5),
1103                            )));
1104                        });
1105                });
1106
1107                // We arbitrarily do all the testing on the first worker.
1108                if worker.index() == 0 {
1109                    use Step::*;
1110                    for step in steps {
1111                        match step {
1112                            Update(update) => {
1113                                let _ = in_tx.send(update);
1114                            }
1115                            AssertStatus(mut statuses) => loop {
1116                                match out_rx.try_recv() {
1117                                    Err(_) => {
1118                                        worker.step();
1119                                        // This makes testing easier.
1120                                        std::thread::sleep(std::time::Duration::from_millis(50));
1121                                    }
1122                                    Ok(update) => {
1123                                        let pos = statuses
1124                                            .iter()
1125                                            .position(|s| {
1126                                                s.collection_index == update.collection_index
1127                                            })
1128                                            .unwrap();
1129
1130                                        let status_to_assert = &statuses[pos];
1131                                        assert_eq!(&update, status_to_assert);
1132
1133                                        statuses.remove(pos);
1134                                        if statuses.is_empty() {
1135                                            break;
1136                                        }
1137                                    }
1138                                }
1139                            },
1140                        }
1141                    }
1142
1143                    // Assert that nothing is left in the channel.
1144                    assert_err!(out_rx.try_recv());
1145                }
1146            },
1147        )
1148        .unwrap();
1149    }
1150
1151    /// Produces (input_index, HealthStatusUpdate)'s based on the input channel.
1152    ///
1153    /// Only the first worker is used, all others immediately drop their capabilities and channels.
1154    /// After the channel is empty on the first worker, then the frontier will go to [].
1155    /// Also ensures that updates are routed to the correct worker based on the `TestUpdate`
1156    /// using an exchange.
1157    fn producer<G: Scope<Timestamp = ()>>(
1158        scope: G,
1159        mut input: UnboundedReceiver<TestUpdate>,
1160    ) -> Stream<G, HealthStatusMessage> {
1161        let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope.clone());
1162        let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<_>>>();
1163
1164        let index = scope.index();
1165        iterator.build(|mut caps| async move {
1166            // We input and assert outputs on the first worker.
1167            if index != 0 {
1168                return;
1169            }
1170            let mut capability = Some(caps.pop().unwrap());
1171            while let Some(element) = input.recv().await {
1172                output_handle.give(
1173                    capability.as_ref().unwrap(),
1174                    (
1175                        element.worker_id,
1176                        element.id,
1177                        element.namespace,
1178                        element.update,
1179                    ),
1180                );
1181            }
1182
1183            capability.take();
1184        });
1185
1186        let output = output.exchange(|d| d.0).map(|d| HealthStatusMessage {
1187            id: d.1,
1188            namespace: d.2,
1189            update: d.3,
1190        });
1191
1192        output
1193    }
1194}