mz_storage/
healthcheck.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Healthcheck common
11
12use std::cell::RefCell;
13use std::collections::btree_map::Entry;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::fmt::Debug;
17use std::rc::Rc;
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use differential_dataflow::Hashable;
22use futures::StreamExt;
23use mz_ore::cast::CastFrom;
24use mz_ore::now::NowFn;
25use mz_repr::GlobalId;
26use mz_storage_client::client::{Status, StatusUpdate};
27use mz_timely_util::builder_async::{
28    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
29};
30use timely::dataflow::channels::pact::Exchange;
31use timely::dataflow::operators::Map;
32use timely::dataflow::{Scope, Stream};
33use tracing::{error, info};
34
35use crate::internal_control::{InternalCommandSender, InternalStorageCommand};
36
37/// The namespace of the update. The `Ord` impl matter here, later variants are
38/// displayed over earlier ones.
39///
40/// Some namespaces (referred to as "sidechannels") can come from any worker_id,
41/// and `Running` statuses from them do not mark the entire object as running.
42///
43/// Ensure you update `is_sidechannel` when adding variants.
44#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
45pub enum StatusNamespace {
46    /// A normal status namespaces. Any `Running` status from any worker will mark the object
47    /// `Running`.
48    Generator,
49    Kafka,
50    Postgres,
51    MySql,
52    SqlServer,
53    Ssh,
54    Upsert,
55    Decode,
56    Internal,
57}
58
59impl StatusNamespace {
60    fn is_sidechannel(&self) -> bool {
61        matches!(self, StatusNamespace::Ssh)
62    }
63}
64
65impl fmt::Display for StatusNamespace {
66    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67        use StatusNamespace::*;
68        match self {
69            Generator => write!(f, "generator"),
70            Kafka => write!(f, "kafka"),
71            Postgres => write!(f, "postgres"),
72            MySql => write!(f, "mysql"),
73            SqlServer => write!(f, "sql-server"),
74            Ssh => write!(f, "ssh"),
75            Upsert => write!(f, "upsert"),
76            Decode => write!(f, "decode"),
77            Internal => write!(f, "internal"),
78        }
79    }
80}
81
82#[derive(Debug)]
83struct PerWorkerHealthStatus {
84    pub(crate) errors_by_worker: Vec<BTreeMap<StatusNamespace, HealthStatusUpdate>>,
85}
86
87impl PerWorkerHealthStatus {
88    fn merge_update(
89        &mut self,
90        worker: usize,
91        namespace: StatusNamespace,
92        update: HealthStatusUpdate,
93        only_greater: bool,
94    ) {
95        let errors = &mut self.errors_by_worker[worker];
96        match errors.entry(namespace) {
97            Entry::Vacant(v) => {
98                v.insert(update);
99            }
100            Entry::Occupied(mut o) => {
101                if !only_greater || o.get() < &update {
102                    o.insert(update);
103                }
104            }
105        }
106    }
107
108    fn decide_status(&self) -> OverallStatus {
109        let mut output_status = OverallStatus::Starting;
110        let mut namespaced_errors: BTreeMap<StatusNamespace, String> = BTreeMap::new();
111        let mut hints: BTreeSet<String> = BTreeSet::new();
112
113        for status in self.errors_by_worker.iter() {
114            for (ns, ns_status) in status.iter() {
115                match ns_status {
116                    // HealthStatusUpdate::Ceased is currently unused, so just
117                    // treat it as if it were a normal error.
118                    //
119                    // TODO: redesign ceased status database-issues#7687
120                    HealthStatusUpdate::Ceased { error } => {
121                        if Some(error) > namespaced_errors.get(ns).as_deref() {
122                            namespaced_errors.insert(*ns, error.to_string());
123                        }
124                    }
125                    HealthStatusUpdate::Stalled { error, hint, .. } => {
126                        if Some(error) > namespaced_errors.get(ns).as_deref() {
127                            namespaced_errors.insert(*ns, error.to_string());
128                        }
129
130                        if let Some(hint) = hint {
131                            hints.insert(hint.to_string());
132                        }
133                    }
134                    HealthStatusUpdate::Running => {
135                        if !ns.is_sidechannel() {
136                            output_status = OverallStatus::Running;
137                        }
138                    }
139                }
140            }
141        }
142
143        if !namespaced_errors.is_empty() {
144            // Pick the most important error.
145            let (ns, err) = namespaced_errors.last_key_value().unwrap();
146            output_status = OverallStatus::Stalled {
147                error: format!("{}: {}", ns, err),
148                hints,
149                namespaced_errors,
150            }
151        }
152
153        output_status
154    }
155}
156
157#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
158pub enum OverallStatus {
159    Starting,
160    Running,
161    Stalled {
162        error: String,
163        hints: BTreeSet<String>,
164        namespaced_errors: BTreeMap<StatusNamespace, String>,
165    },
166    Ceased {
167        error: String,
168    },
169}
170
171impl OverallStatus {
172    /// The user-readable error string, if there is one.
173    pub(crate) fn error(&self) -> Option<&str> {
174        match self {
175            OverallStatus::Starting | OverallStatus::Running => None,
176            OverallStatus::Stalled { error, .. } | OverallStatus::Ceased { error, .. } => {
177                Some(error)
178            }
179        }
180    }
181
182    /// A set of namespaced errors, if there are any.
183    pub(crate) fn errors(&self) -> Option<&BTreeMap<StatusNamespace, String>> {
184        match self {
185            OverallStatus::Starting | OverallStatus::Running | OverallStatus::Ceased { .. } => None,
186            OverallStatus::Stalled {
187                namespaced_errors, ..
188            } => Some(namespaced_errors),
189        }
190    }
191
192    /// A set of hints, if there are any.
193    pub(crate) fn hints(&self) -> BTreeSet<String> {
194        match self {
195            OverallStatus::Starting | OverallStatus::Running | OverallStatus::Ceased { .. } => {
196                BTreeSet::new()
197            }
198            OverallStatus::Stalled { hints, .. } => hints.clone(),
199        }
200    }
201}
202
203impl<'a> From<&'a OverallStatus> for Status {
204    fn from(val: &'a OverallStatus) -> Self {
205        match val {
206            OverallStatus::Starting => Status::Starting,
207            OverallStatus::Running => Status::Running,
208            OverallStatus::Stalled { .. } => Status::Stalled,
209            OverallStatus::Ceased { .. } => Status::Ceased,
210        }
211    }
212}
213
214#[derive(Debug)]
215struct HealthState {
216    healths: PerWorkerHealthStatus,
217    last_reported_status: Option<OverallStatus>,
218    halt_with: Option<(StatusNamespace, HealthStatusUpdate)>,
219}
220
221impl HealthState {
222    fn new(worker_count: usize) -> HealthState {
223        HealthState {
224            healths: PerWorkerHealthStatus {
225                errors_by_worker: vec![Default::default(); worker_count],
226            },
227            last_reported_status: None,
228            halt_with: None,
229        }
230    }
231}
232
233/// A trait that lets a user configure the `health_operator` with custom
234/// behavior. This is mostly useful for testing, and the [`DefaultWriter`]
235/// should be the correct implementation for everyone.
236pub trait HealthOperator {
237    /// Record a new status.
238    fn record_new_status(
239        &self,
240        collection_id: GlobalId,
241        ts: DateTime<Utc>,
242        new_status: Status,
243        new_error: Option<&str>,
244        hints: &BTreeSet<String>,
245        namespaced_errors: &BTreeMap<StatusNamespace, String>,
246        // TODO(guswynn): not urgent:
247        // Ideally this would be entirely included in the `DefaultWriter`, but that
248        // requires a fairly heavy change to the `health_operator`, which hardcodes
249        // some use of persist. For now we just leave it and ignore it in tests.
250        write_namespaced_map: bool,
251    );
252    fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>);
253
254    /// Optionally override the chosen worker index. Default is semi-random.
255    /// Only useful for tests.
256    fn chosen_worker(&self) -> Option<usize> {
257        None
258    }
259}
260
261/// A default `HealthOperator` for use in normal cases.
262pub struct DefaultWriter {
263    pub command_tx: InternalCommandSender,
264    pub updates: Rc<RefCell<Vec<StatusUpdate>>>,
265}
266
267impl HealthOperator for DefaultWriter {
268    fn record_new_status(
269        &self,
270        collection_id: GlobalId,
271        ts: DateTime<Utc>,
272        status: Status,
273        new_error: Option<&str>,
274        hints: &BTreeSet<String>,
275        namespaced_errors: &BTreeMap<StatusNamespace, String>,
276        write_namespaced_map: bool,
277    ) {
278        self.updates.borrow_mut().push(StatusUpdate {
279            id: collection_id,
280            timestamp: ts,
281            status,
282            error: new_error.map(|e| e.to_string()),
283            hints: hints.clone(),
284            namespaced_errors: if write_namespaced_map {
285                namespaced_errors
286                    .iter()
287                    .map(|(ns, val)| (ns.to_string(), val.clone()))
288                    .collect()
289            } else {
290                BTreeMap::new()
291            },
292            replica_id: None,
293        });
294    }
295
296    fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>) {
297        self.command_tx
298            .send(InternalStorageCommand::SuspendAndRestart {
299                // Suspend and restart is expected to operate on the primary object and
300                // not any of the sub-objects
301                id,
302                reason: format!("{:?}", error),
303            });
304    }
305}
306
307/// A health message consumed by the `health_operator`.
308#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
309pub struct HealthStatusMessage {
310    /// The object that this status message is about. When None, it refers to the entire ingestion
311    /// as a whole. When Some, it refers to a specific subsource.
312    pub id: Option<GlobalId>,
313    /// The namespace of the health update.
314    pub namespace: StatusNamespace,
315    /// The update itself.
316    pub update: HealthStatusUpdate,
317}
318
319/// Writes updates that come across `health_stream` to the collection's status shards, as identified
320/// by their `CollectionMetadata`.
321///
322/// Only one worker will be active and write to the status shard.
323///
324/// The `OutputIndex` values that come across `health_stream` must be a strict subset of those in
325/// `configs`'s keys.
326pub(crate) fn health_operator<G, P>(
327    scope: &G,
328    now: NowFn,
329    // A set of id's that should be marked as `HealthStatusUpdate::starting()` during startup.
330    mark_starting: BTreeSet<GlobalId>,
331    // An id that is allowed to halt the dataflow. Others are ignored, and panic during debug
332    // mode.
333    halting_id: GlobalId,
334    // A description of the object type we are writing status updates about. Used in log lines.
335    object_type: &'static str,
336    // An indexed stream of health updates. Indexes are configured in `configs`.
337    health_stream: &Stream<G, HealthStatusMessage>,
338    // An impl of `HealthOperator` that configures the output behavior of this operator.
339    health_operator_impl: P,
340    // Whether or not we should actually write namespaced errors in the `details` column.
341    write_namespaced_map: bool,
342    // How long to wait before initiating a `SuspendAndRestart` command, to
343    // prevent hot restart loops.
344    suspend_and_restart_delay: Duration,
345) -> PressOnDropButton
346where
347    G: Scope,
348    P: HealthOperator + 'static,
349{
350    // Derived config options
351    let healthcheck_worker_id = scope.index();
352    let worker_count = scope.peers();
353
354    // Inject the originating worker id to each item before exchanging to the chosen worker
355    let health_stream = health_stream.map(move |status| (healthcheck_worker_id, status));
356
357    let chosen_worker_id = if let Some(index) = health_operator_impl.chosen_worker() {
358        index
359    } else {
360        // We'll route all the work to a single arbitrary worker;
361        // there's not much to do, and we need a global view.
362        usize::cast_from(mark_starting.iter().next().hashed()) % worker_count
363    };
364
365    let is_active_worker = chosen_worker_id == healthcheck_worker_id;
366
367    let operator_name = format!("healthcheck({})", healthcheck_worker_id);
368    let mut health_op = AsyncOperatorBuilder::new(operator_name, scope.clone());
369
370    let mut input = health_op.new_disconnected_input(
371        &health_stream,
372        Exchange::new(move |_| u64::cast_from(chosen_worker_id)),
373    );
374
375    let button = health_op.build(move |mut _capabilities| async move {
376        let mut health_states: BTreeMap<_, _> = mark_starting
377            .iter()
378            .copied()
379            .chain([halting_id])
380            .map(|id| (id, HealthState::new(worker_count)))
381            .collect();
382
383        // Write the initial starting state to the status shard for all managed objects
384        if is_active_worker {
385            for (id, state) in health_states.iter_mut() {
386                let status = OverallStatus::Starting;
387                let timestamp = mz_ore::now::to_datetime(now());
388                health_operator_impl.record_new_status(
389                    *id,
390                    timestamp,
391                    (&status).into(),
392                    status.error(),
393                    &status.hints(),
394                    status.errors().unwrap_or(&BTreeMap::new()),
395                    write_namespaced_map,
396                );
397
398                state.last_reported_status = Some(status);
399            }
400        }
401
402        let mut outputs_seen = BTreeMap::<GlobalId, BTreeSet<_>>::new();
403        while let Some(event) = input.next().await {
404            if let AsyncEvent::Data(_cap, rows) = event {
405                for (worker_id, message) in rows {
406                    let HealthStatusMessage {
407                        id,
408                        namespace: ns,
409                        update: health_event,
410                    } = message;
411                    let id = id.unwrap_or(halting_id);
412                    let HealthState {
413                        healths, halt_with, ..
414                    } = match health_states.get_mut(&id) {
415                        Some(health) => health,
416                        // This is a health status update for a sub-object_type that we did not request to
417                        // be generated, which means it doesn't have a GlobalId and should not be
418                        // propagated to the shard.
419                        None => continue,
420                    };
421
422                    // Its important to track `new_round` per-namespace, so namespaces are reasoned
423                    // about in `merge_update` independently.
424                    let new_round = outputs_seen
425                        .entry(id)
426                        .or_insert_with(BTreeSet::new)
427                        .insert(ns.clone());
428
429                    if !is_active_worker {
430                        error!(
431                            "Health messages for {object_type} {id} passed to \
432                              an unexpected worker id: {healthcheck_worker_id}"
433                        )
434                    }
435
436                    if health_event.should_halt() {
437                        *halt_with = Some((ns.clone(), health_event.clone()));
438                    }
439
440                    healths.merge_update(worker_id, ns, health_event, !new_round);
441                }
442
443                let mut halt_with_outer = None;
444
445                while let Some((id, _)) = outputs_seen.pop_first() {
446                    let HealthState {
447                        healths,
448                        last_reported_status,
449                        halt_with,
450                    } = health_states.get_mut(&id).expect("known to exist");
451
452                    let new_status = healths.decide_status();
453
454                    if Some(&new_status) != last_reported_status.as_ref() {
455                        info!(
456                            "Health transition for {object_type} {id}: \
457                                  {last_reported_status:?} -> {:?}",
458                            Some(&new_status)
459                        );
460
461                        let timestamp = mz_ore::now::to_datetime(now());
462                        health_operator_impl.record_new_status(
463                            id,
464                            timestamp,
465                            (&new_status).into(),
466                            new_status.error(),
467                            &new_status.hints(),
468                            new_status.errors().unwrap_or(&BTreeMap::new()),
469                            write_namespaced_map,
470                        );
471
472                        *last_reported_status = Some(new_status.clone());
473                    }
474
475                    // Set halt with if None.
476                    if halt_with_outer.is_none() && halt_with.is_some() {
477                        halt_with_outer = Some((id, halt_with.clone()));
478                    }
479                }
480
481                // TODO(aljoscha): Instead of threading through the
482                // `should_halt` bit, we can give an internal command sender
483                // directly to the places where `should_halt = true` originates.
484                // We should definitely do that, but this is okay for a PoC.
485                if let Some((id, halt_with)) = halt_with_outer {
486                    mz_ore::soft_assert_or_log!(
487                        id == halting_id,
488                        "sub{object_type}s should not produce \
489                        halting errors, however {:?} halted while primary \
490                                            {object_type} is {:?}",
491                        id,
492                        halting_id
493                    );
494
495                    info!(
496                        "Broadcasting suspend-and-restart \
497                        command because of {:?} after {:?} delay",
498                        halt_with, suspend_and_restart_delay
499                    );
500                    tokio::time::sleep(suspend_and_restart_delay).await;
501                    health_operator_impl.send_halt(id, halt_with);
502                }
503            }
504        }
505    });
506
507    button.press_on_drop()
508}
509
510use serde::{Deserialize, Serialize};
511
512/// NB: we derive Ord here, so the enum order matters. Generally, statuses later in the list
513/// take precedence over earlier ones: so if one worker is stalled, we'll consider the entire
514/// source to be stalled.
515#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
516pub enum HealthStatusUpdate {
517    Running,
518    Stalled {
519        error: String,
520        hint: Option<String>,
521        should_halt: bool,
522    },
523    Ceased {
524        error: String,
525    },
526}
527
528impl HealthStatusUpdate {
529    /// Generates a running [`HealthStatusUpdate`].
530    pub(crate) fn running() -> Self {
531        HealthStatusUpdate::Running
532    }
533
534    /// Generates a non-halting [`HealthStatusUpdate`] with `update`.
535    pub(crate) fn stalled(error: String, hint: Option<String>) -> Self {
536        HealthStatusUpdate::Stalled {
537            error,
538            hint,
539            should_halt: false,
540        }
541    }
542
543    /// Generates a halting [`HealthStatusUpdate`] with `update`.
544    pub(crate) fn halting(error: String, hint: Option<String>) -> Self {
545        HealthStatusUpdate::Stalled {
546            error,
547            hint,
548            should_halt: true,
549        }
550    }
551
552    // TODO: redesign ceased status database-issues#7687
553    // Generates a ceasing [`HealthStatusUpdate`] with `update`.
554    // pub(crate) fn ceasing(error: String) -> Self {
555    //     HealthStatusUpdate::Ceased { error }
556    // }
557
558    /// Whether or not we should halt the dataflow instances and restart it.
559    pub(crate) fn should_halt(&self) -> bool {
560        match self {
561            HealthStatusUpdate::Running |
562            // HealthStatusUpdate::Ceased should never halt because it can occur
563            // at the subsource level and should not cause the entire dataflow
564            // to halt. Instead, the dataflow itself should handle shutting
565            // itself down if need be.
566            HealthStatusUpdate::Ceased { .. } => false,
567            HealthStatusUpdate::Stalled { should_halt, .. } => *should_halt,
568        }
569    }
570}
571
572#[cfg(test)]
573mod tests {
574    use super::*;
575    use itertools::Itertools;
576
577    // Actual timely tests for `health_operator`.
578
579    #[mz_ore::test]
580    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
581    fn test_health_operator_basic() {
582        use Step::*;
583
584        // Test 2 inputs across 2 workers.
585        health_operator_runner(
586            2,
587            2,
588            true,
589            vec![
590                AssertStatus(vec![
591                    // Assert both inputs started.
592                    StatusToAssert {
593                        collection_index: 0,
594                        status: Status::Starting,
595                        ..Default::default()
596                    },
597                    StatusToAssert {
598                        collection_index: 1,
599                        status: Status::Starting,
600                        ..Default::default()
601                    },
602                ]),
603                // Update and assert one is running.
604                Update(TestUpdate {
605                    worker_id: 1,
606                    namespace: StatusNamespace::Generator,
607                    id: None,
608                    update: HealthStatusUpdate::running(),
609                }),
610                AssertStatus(vec![StatusToAssert {
611                    collection_index: 0,
612                    status: Status::Running,
613                    ..Default::default()
614                }]),
615                // Assert the other can be stalled by 1 worker.
616                //
617                // TODO(guswynn): ideally we could push these updates
618                // at the same time, but because they are coming from separately
619                // workers, they could end up in different rounds, causing flakes.
620                // For now, we just do this.
621                Update(TestUpdate {
622                    worker_id: 1,
623                    namespace: StatusNamespace::Generator,
624                    id: Some(GlobalId::User(1)),
625                    update: HealthStatusUpdate::running(),
626                }),
627                AssertStatus(vec![StatusToAssert {
628                    collection_index: 1,
629                    status: Status::Running,
630                    ..Default::default()
631                }]),
632                Update(TestUpdate {
633                    worker_id: 0,
634                    namespace: StatusNamespace::Generator,
635                    id: Some(GlobalId::User(1)),
636                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
637                }),
638                AssertStatus(vec![StatusToAssert {
639                    collection_index: 1,
640                    status: Status::Stalled,
641                    error: Some("generator: uhoh".to_string()),
642                    errors: Some("generator: uhoh".to_string()),
643                    ..Default::default()
644                }]),
645                // And that it can recover.
646                Update(TestUpdate {
647                    worker_id: 0,
648                    namespace: StatusNamespace::Generator,
649                    id: Some(GlobalId::User(1)),
650                    update: HealthStatusUpdate::running(),
651                }),
652                AssertStatus(vec![StatusToAssert {
653                    collection_index: 1,
654                    status: Status::Running,
655                    ..Default::default()
656                }]),
657            ],
658        );
659    }
660
661    #[mz_ore::test]
662    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
663    fn test_health_operator_write_namespaced_map() {
664        use Step::*;
665
666        // Test 2 inputs across 2 workers.
667        health_operator_runner(
668            2,
669            2,
670            // testing this
671            false,
672            vec![
673                AssertStatus(vec![
674                    // Assert both inputs started.
675                    StatusToAssert {
676                        collection_index: 0,
677                        status: Status::Starting,
678                        ..Default::default()
679                    },
680                    StatusToAssert {
681                        collection_index: 1,
682                        status: Status::Starting,
683                        ..Default::default()
684                    },
685                ]),
686                Update(TestUpdate {
687                    worker_id: 0,
688                    namespace: StatusNamespace::Generator,
689                    id: Some(GlobalId::User(1)),
690                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
691                }),
692                AssertStatus(vec![StatusToAssert {
693                    collection_index: 1,
694                    status: Status::Stalled,
695                    error: Some("generator: uhoh".to_string()),
696                    errors: None,
697                    ..Default::default()
698                }]),
699            ],
700        )
701    }
702
703    #[mz_ore::test]
704    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
705    fn test_health_operator_namespaces() {
706        use Step::*;
707
708        // Test 2 inputs across 2 workers.
709        health_operator_runner(
710            2,
711            1,
712            true,
713            vec![
714                AssertStatus(vec![
715                    // Assert both inputs started.
716                    StatusToAssert {
717                        collection_index: 0,
718                        status: Status::Starting,
719                        ..Default::default()
720                    },
721                ]),
722                // Assert that we merge namespaced errors correctly.
723                //
724                // Note that these all happen on the same worker id.
725                Update(TestUpdate {
726                    worker_id: 0,
727                    namespace: StatusNamespace::Generator,
728                    id: None,
729                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
730                }),
731                AssertStatus(vec![StatusToAssert {
732                    collection_index: 0,
733                    status: Status::Stalled,
734                    error: Some("generator: uhoh".to_string()),
735                    errors: Some("generator: uhoh".to_string()),
736                    ..Default::default()
737                }]),
738                Update(TestUpdate {
739                    worker_id: 0,
740                    namespace: StatusNamespace::Kafka,
741                    id: None,
742                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
743                }),
744                AssertStatus(vec![StatusToAssert {
745                    collection_index: 0,
746                    status: Status::Stalled,
747                    error: Some("kafka: uhoh".to_string()),
748                    errors: Some("generator: uhoh, kafka: uhoh".to_string()),
749                    ..Default::default()
750                }]),
751                // And that it can recover.
752                Update(TestUpdate {
753                    worker_id: 0,
754                    namespace: StatusNamespace::Kafka,
755                    id: None,
756                    update: HealthStatusUpdate::running(),
757                }),
758                AssertStatus(vec![StatusToAssert {
759                    collection_index: 0,
760                    status: Status::Stalled,
761                    error: Some("generator: uhoh".to_string()),
762                    errors: Some("generator: uhoh".to_string()),
763                    ..Default::default()
764                }]),
765                Update(TestUpdate {
766                    worker_id: 0,
767                    namespace: StatusNamespace::Generator,
768                    id: None,
769                    update: HealthStatusUpdate::running(),
770                }),
771                AssertStatus(vec![StatusToAssert {
772                    collection_index: 0,
773                    status: Status::Running,
774                    ..Default::default()
775                }]),
776            ],
777        );
778    }
779
780    #[mz_ore::test]
781    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
782    fn test_health_operator_namespace_side_channel() {
783        use Step::*;
784
785        health_operator_runner(
786            2,
787            1,
788            true,
789            vec![
790                AssertStatus(vec![
791                    // Assert both inputs started.
792                    StatusToAssert {
793                        collection_index: 0,
794                        status: Status::Starting,
795                        ..Default::default()
796                    },
797                ]),
798                // Assert that sidechannel namespaces don't downgrade the status
799                //
800                // Note that these all happen on the same worker id.
801                Update(TestUpdate {
802                    worker_id: 0,
803                    namespace: StatusNamespace::Ssh,
804                    id: None,
805                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
806                }),
807                AssertStatus(vec![StatusToAssert {
808                    collection_index: 0,
809                    status: Status::Stalled,
810                    error: Some("ssh: uhoh".to_string()),
811                    errors: Some("ssh: uhoh".to_string()),
812                    ..Default::default()
813                }]),
814                Update(TestUpdate {
815                    worker_id: 0,
816                    namespace: StatusNamespace::Ssh,
817                    id: None,
818                    update: HealthStatusUpdate::stalled("uhoh2".to_string(), None),
819                }),
820                AssertStatus(vec![StatusToAssert {
821                    collection_index: 0,
822                    status: Status::Stalled,
823                    error: Some("ssh: uhoh2".to_string()),
824                    errors: Some("ssh: uhoh2".to_string()),
825                    ..Default::default()
826                }]),
827                Update(TestUpdate {
828                    worker_id: 0,
829                    namespace: StatusNamespace::Ssh,
830                    id: None,
831                    update: HealthStatusUpdate::running(),
832                }),
833                // We haven't starting running yet, as a `Default` namespace hasn't told us.
834                AssertStatus(vec![StatusToAssert {
835                    collection_index: 0,
836                    status: Status::Starting,
837                    ..Default::default()
838                }]),
839                Update(TestUpdate {
840                    worker_id: 0,
841                    namespace: StatusNamespace::Generator,
842                    id: None,
843                    update: HealthStatusUpdate::running(),
844                }),
845                AssertStatus(vec![StatusToAssert {
846                    collection_index: 0,
847                    status: Status::Running,
848                    ..Default::default()
849                }]),
850            ],
851        );
852    }
853
854    #[mz_ore::test]
855    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
856    fn test_health_operator_hints() {
857        use Step::*;
858
859        health_operator_runner(
860            2,
861            1,
862            true,
863            vec![
864                AssertStatus(vec![
865                    // Assert both inputs started.
866                    StatusToAssert {
867                        collection_index: 0,
868                        status: Status::Starting,
869                        ..Default::default()
870                    },
871                ]),
872                // Note that these all happen across worker ids.
873                Update(TestUpdate {
874                    worker_id: 0,
875                    namespace: StatusNamespace::Generator,
876                    id: None,
877                    update: HealthStatusUpdate::stalled(
878                        "uhoh".to_string(),
879                        Some("hint1".to_string()),
880                    ),
881                }),
882                AssertStatus(vec![StatusToAssert {
883                    collection_index: 0,
884                    status: Status::Stalled,
885                    error: Some("generator: uhoh".to_string()),
886                    errors: Some("generator: uhoh".to_string()),
887                    hint: Some("hint1".to_string()),
888                }]),
889                Update(TestUpdate {
890                    worker_id: 1,
891                    namespace: StatusNamespace::Generator,
892                    id: None,
893                    update: HealthStatusUpdate::stalled(
894                        "uhoh2".to_string(),
895                        Some("hint2".to_string()),
896                    ),
897                }),
898                AssertStatus(vec![StatusToAssert {
899                    collection_index: 0,
900                    status: Status::Stalled,
901                    // Note the error sorts later so we just use that.
902                    error: Some("generator: uhoh2".to_string()),
903                    errors: Some("generator: uhoh2".to_string()),
904                    hint: Some("hint1, hint2".to_string()),
905                }]),
906                // Update one of the hints
907                Update(TestUpdate {
908                    worker_id: 1,
909                    namespace: StatusNamespace::Generator,
910                    id: None,
911                    update: HealthStatusUpdate::stalled(
912                        "uhoh2".to_string(),
913                        Some("hint3".to_string()),
914                    ),
915                }),
916                AssertStatus(vec![StatusToAssert {
917                    collection_index: 0,
918                    status: Status::Stalled,
919                    // Note the error sorts later so we just use that.
920                    error: Some("generator: uhoh2".to_string()),
921                    errors: Some("generator: uhoh2".to_string()),
922                    hint: Some("hint1, hint3".to_string()),
923                }]),
924                // Assert recovery.
925                Update(TestUpdate {
926                    worker_id: 0,
927                    namespace: StatusNamespace::Generator,
928                    id: None,
929                    update: HealthStatusUpdate::running(),
930                }),
931                AssertStatus(vec![StatusToAssert {
932                    collection_index: 0,
933                    status: Status::Stalled,
934                    // Note the error sorts later so we just use that.
935                    error: Some("generator: uhoh2".to_string()),
936                    errors: Some("generator: uhoh2".to_string()),
937                    hint: Some("hint3".to_string()),
938                }]),
939                Update(TestUpdate {
940                    worker_id: 1,
941                    namespace: StatusNamespace::Generator,
942                    id: None,
943                    update: HealthStatusUpdate::running(),
944                }),
945                AssertStatus(vec![StatusToAssert {
946                    collection_index: 0,
947                    status: Status::Running,
948                    ..Default::default()
949                }]),
950            ],
951        );
952    }
953
954    // The below is ALL test infrastructure for the above
955
956    use mz_ore::assert_err;
957    use timely::container::CapacityContainerBuilder;
958    use timely::dataflow::Scope;
959    use timely::dataflow::operators::Enter;
960    use timely::dataflow::operators::exchange::Exchange;
961    use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
962
963    /// A status to assert.
964    #[derive(Debug, Clone, PartialEq, Eq)]
965    struct StatusToAssert {
966        collection_index: usize,
967        status: Status,
968        error: Option<String>,
969        errors: Option<String>,
970        hint: Option<String>,
971    }
972
973    impl Default for StatusToAssert {
974        fn default() -> Self {
975            StatusToAssert {
976                collection_index: Default::default(),
977                status: Status::Running,
978                error: Default::default(),
979                errors: Default::default(),
980                hint: Default::default(),
981            }
982        }
983    }
984
985    /// An update to push into the operator.
986    /// Can come from any worker, and from any input.
987    #[derive(Debug, Clone)]
988    struct TestUpdate {
989        worker_id: u64,
990        namespace: StatusNamespace,
991        id: Option<GlobalId>,
992        update: HealthStatusUpdate,
993    }
994
995    #[derive(Debug, Clone)]
996    enum Step {
997        /// Insert a new health update.
998        Update(TestUpdate),
999        /// Assert a set of outputs. Note that these should
1000        /// have unique `collection_index`'s
1001        AssertStatus(Vec<StatusToAssert>),
1002    }
1003
1004    struct TestWriter {
1005        sender: UnboundedSender<StatusToAssert>,
1006        input_mapping: BTreeMap<GlobalId, usize>,
1007    }
1008
1009    impl HealthOperator for TestWriter {
1010        fn record_new_status(
1011            &self,
1012            collection_id: GlobalId,
1013            _ts: DateTime<Utc>,
1014            status: Status,
1015            new_error: Option<&str>,
1016            hints: &BTreeSet<String>,
1017            namespaced_errors: &BTreeMap<StatusNamespace, String>,
1018            write_namespaced_map: bool,
1019        ) {
1020            let _ = self.sender.send(StatusToAssert {
1021                collection_index: *self.input_mapping.get(&collection_id).unwrap(),
1022                status,
1023                error: new_error.map(str::to_string),
1024                errors: if !namespaced_errors.is_empty() && write_namespaced_map {
1025                    Some(
1026                        namespaced_errors
1027                            .iter()
1028                            .map(|(ns, err)| format!("{}: {}", ns, err))
1029                            .join(", "),
1030                    )
1031                } else {
1032                    None
1033                },
1034                hint: if !hints.is_empty() {
1035                    Some(hints.iter().join(", "))
1036                } else {
1037                    None
1038                },
1039            });
1040        }
1041
1042        fn send_halt(&self, _id: GlobalId, _error: Option<(StatusNamespace, HealthStatusUpdate)>) {
1043            // Not yet unit-tested
1044            unimplemented!()
1045        }
1046
1047        fn chosen_worker(&self) -> Option<usize> {
1048            // We input and assert outputs on the first worker.
1049            Some(0)
1050        }
1051    }
1052
1053    /// Setup a `health_operator` with a set number of workers and inputs, and the
1054    /// steps on the first worker.
1055    fn health_operator_runner(
1056        workers: usize,
1057        inputs: usize,
1058        write_namespaced_map: bool,
1059        steps: Vec<Step>,
1060    ) {
1061        let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
1062        let tokio_handle = tokio_runtime.handle().clone();
1063
1064        let inputs: BTreeMap<GlobalId, usize> = (0..inputs)
1065            .map(|index| (GlobalId::User(u64::cast_from(index)), index))
1066            .collect();
1067
1068        timely::execute::execute(
1069            timely::execute::Config {
1070                communication: timely::CommunicationConfig::Process(workers),
1071                worker: Default::default(),
1072            },
1073            move |worker| {
1074                let steps = steps.clone();
1075                let inputs = inputs.clone();
1076
1077                let _tokio_guard = tokio_handle.enter();
1078                let (in_tx, in_rx) = unbounded_channel();
1079                let (out_tx, mut out_rx) = unbounded_channel();
1080
1081                worker.dataflow::<(), _, _>(|root_scope| {
1082                    root_scope
1083                        .clone()
1084                        .scoped::<mz_repr::Timestamp, _, _>("gus", |scope| {
1085                            let input = producer(root_scope.clone(), in_rx).enter(scope);
1086                            Box::leak(Box::new(health_operator(
1087                                scope,
1088                                mz_ore::now::SYSTEM_TIME.clone(),
1089                                inputs.keys().copied().collect(),
1090                                *inputs.first_key_value().unwrap().0,
1091                                "source_test",
1092                                &input,
1093                                TestWriter {
1094                                    sender: out_tx,
1095                                    input_mapping: inputs,
1096                                },
1097                                write_namespaced_map,
1098                                Duration::from_secs(5),
1099                            )));
1100                        });
1101                });
1102
1103                // We arbitrarily do all the testing on the first worker.
1104                if worker.index() == 0 {
1105                    use Step::*;
1106                    for step in steps {
1107                        match step {
1108                            Update(update) => {
1109                                let _ = in_tx.send(update);
1110                            }
1111                            AssertStatus(mut statuses) => loop {
1112                                match out_rx.try_recv() {
1113                                    Err(_) => {
1114                                        worker.step();
1115                                        // This makes testing easier.
1116                                        std::thread::sleep(std::time::Duration::from_millis(50));
1117                                    }
1118                                    Ok(update) => {
1119                                        let pos = statuses
1120                                            .iter()
1121                                            .position(|s| {
1122                                                s.collection_index == update.collection_index
1123                                            })
1124                                            .unwrap();
1125
1126                                        let status_to_assert = &statuses[pos];
1127                                        assert_eq!(&update, status_to_assert);
1128
1129                                        statuses.remove(pos);
1130                                        if statuses.is_empty() {
1131                                            break;
1132                                        }
1133                                    }
1134                                }
1135                            },
1136                        }
1137                    }
1138
1139                    // Assert that nothing is left in the channel.
1140                    assert_err!(out_rx.try_recv());
1141                }
1142            },
1143        )
1144        .unwrap();
1145    }
1146
1147    /// Produces (input_index, HealthStatusUpdate)'s based on the input channel.
1148    ///
1149    /// Only the first worker is used, all others immediately drop their capabilities and channels.
1150    /// After the channel is empty on the first worker, then the frontier will go to [].
1151    /// Also ensures that updates are routed to the correct worker based on the `TestUpdate`
1152    /// using an exchange.
1153    fn producer<G: Scope<Timestamp = ()>>(
1154        scope: G,
1155        mut input: UnboundedReceiver<TestUpdate>,
1156    ) -> Stream<G, HealthStatusMessage> {
1157        let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope.clone());
1158        let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<_>>>();
1159
1160        let index = scope.index();
1161        iterator.build(|mut caps| async move {
1162            // We input and assert outputs on the first worker.
1163            if index != 0 {
1164                return;
1165            }
1166            let mut capability = Some(caps.pop().unwrap());
1167            while let Some(element) = input.recv().await {
1168                output_handle.give(
1169                    capability.as_ref().unwrap(),
1170                    (
1171                        element.worker_id,
1172                        element.id,
1173                        element.namespace,
1174                        element.update,
1175                    ),
1176                );
1177            }
1178
1179            capability.take();
1180        });
1181
1182        let output = output.exchange(|d| d.0).map(|d| HealthStatusMessage {
1183            id: d.1,
1184            namespace: d.2,
1185            update: d.3,
1186        });
1187
1188        output
1189    }
1190}