Skip to main content

mz_storage/
healthcheck.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Healthcheck common
11
12use std::cell::RefCell;
13use std::collections::btree_map::Entry;
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::fmt::Debug;
17use std::rc::Rc;
18use std::time::Duration;
19
20use chrono::{DateTime, Utc};
21use differential_dataflow::Hashable;
22use futures::StreamExt;
23use mz_ore::cast::CastFrom;
24use mz_ore::now::NowFn;
25use mz_repr::GlobalId;
26use mz_storage_client::client::{Status, StatusUpdate};
27use mz_timely_util::builder_async::{
28    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
29};
30use timely::dataflow::channels::pact::Exchange;
31use timely::dataflow::operators::Map;
32use timely::dataflow::{Scope, Stream};
33use tracing::{error, info};
34
35use crate::internal_control::{InternalCommandSender, InternalStorageCommand};
36
37/// The namespace of the update. The `Ord` impl matter here, later variants are
38/// displayed over earlier ones.
39///
40/// Some namespaces (referred to as "sidechannels") can come from any worker_id,
41/// and `Running` statuses from them do not mark the entire object as running.
42///
43/// Ensure you update `is_sidechannel` when adding variants.
44#[derive(
45    Copy,
46    Clone,
47    Debug,
48    Serialize,
49    Deserialize,
50    PartialEq,
51    Eq,
52    PartialOrd,
53    Ord
54)]
55pub enum StatusNamespace {
56    /// A normal status namespaces. Any `Running` status from any worker will mark the object
57    /// `Running`.
58    Generator,
59    Kafka,
60    Postgres,
61    MySql,
62    SqlServer,
63    Ssh,
64    Upsert,
65    Decode,
66    Iceberg,
67    Internal,
68}
69
70impl StatusNamespace {
71    fn is_sidechannel(&self) -> bool {
72        matches!(self, StatusNamespace::Ssh)
73    }
74}
75
76impl fmt::Display for StatusNamespace {
77    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78        use StatusNamespace::*;
79        match self {
80            Generator => write!(f, "generator"),
81            Kafka => write!(f, "kafka"),
82            Postgres => write!(f, "postgres"),
83            MySql => write!(f, "mysql"),
84            SqlServer => write!(f, "sql-server"),
85            Ssh => write!(f, "ssh"),
86            Upsert => write!(f, "upsert"),
87            Decode => write!(f, "decode"),
88            Internal => write!(f, "internal"),
89            Iceberg => write!(f, "iceberg"),
90        }
91    }
92}
93
94#[derive(Debug)]
95struct PerWorkerHealthStatus {
96    pub(crate) errors_by_worker: Vec<BTreeMap<StatusNamespace, HealthStatusUpdate>>,
97}
98
99impl PerWorkerHealthStatus {
100    fn merge_update(
101        &mut self,
102        worker: usize,
103        namespace: StatusNamespace,
104        update: HealthStatusUpdate,
105        only_greater: bool,
106    ) {
107        let errors = &mut self.errors_by_worker[worker];
108        match errors.entry(namespace) {
109            Entry::Vacant(v) => {
110                v.insert(update);
111            }
112            Entry::Occupied(mut o) => {
113                if !only_greater || o.get() < &update {
114                    o.insert(update);
115                }
116            }
117        }
118    }
119
120    fn decide_status(&self) -> OverallStatus {
121        let mut output_status = OverallStatus::Starting;
122        let mut namespaced_errors: BTreeMap<StatusNamespace, String> = BTreeMap::new();
123        let mut hints: BTreeSet<String> = BTreeSet::new();
124
125        for status in self.errors_by_worker.iter() {
126            for (ns, ns_status) in status.iter() {
127                match ns_status {
128                    // HealthStatusUpdate::Ceased is currently unused, so just
129                    // treat it as if it were a normal error.
130                    //
131                    // TODO: redesign ceased status database-issues#7687
132                    HealthStatusUpdate::Ceased { error } => {
133                        if Some(error) > namespaced_errors.get(ns).as_deref() {
134                            namespaced_errors.insert(*ns, error.to_string());
135                        }
136                    }
137                    HealthStatusUpdate::Stalled { error, hint, .. } => {
138                        if Some(error) > namespaced_errors.get(ns).as_deref() {
139                            namespaced_errors.insert(*ns, error.to_string());
140                        }
141
142                        if let Some(hint) = hint {
143                            hints.insert(hint.to_string());
144                        }
145                    }
146                    HealthStatusUpdate::Running => {
147                        if !ns.is_sidechannel() {
148                            output_status = OverallStatus::Running;
149                        }
150                    }
151                }
152            }
153        }
154
155        if !namespaced_errors.is_empty() {
156            // Pick the most important error.
157            let (ns, err) = namespaced_errors.last_key_value().unwrap();
158            output_status = OverallStatus::Stalled {
159                error: format!("{}: {}", ns, err),
160                hints,
161                namespaced_errors,
162            }
163        }
164
165        output_status
166    }
167}
168
169#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
170pub enum OverallStatus {
171    Starting,
172    Running,
173    Stalled {
174        error: String,
175        hints: BTreeSet<String>,
176        namespaced_errors: BTreeMap<StatusNamespace, String>,
177    },
178    Ceased {
179        error: String,
180    },
181}
182
183impl OverallStatus {
184    /// The user-readable error string, if there is one.
185    pub(crate) fn error(&self) -> Option<&str> {
186        match self {
187            OverallStatus::Starting | OverallStatus::Running => None,
188            OverallStatus::Stalled { error, .. } | OverallStatus::Ceased { error, .. } => {
189                Some(error)
190            }
191        }
192    }
193
194    /// A set of namespaced errors, if there are any.
195    pub(crate) fn errors(&self) -> Option<&BTreeMap<StatusNamespace, String>> {
196        match self {
197            OverallStatus::Starting | OverallStatus::Running | OverallStatus::Ceased { .. } => None,
198            OverallStatus::Stalled {
199                namespaced_errors, ..
200            } => Some(namespaced_errors),
201        }
202    }
203
204    /// A set of hints, if there are any.
205    pub(crate) fn hints(&self) -> BTreeSet<String> {
206        match self {
207            OverallStatus::Starting | OverallStatus::Running | OverallStatus::Ceased { .. } => {
208                BTreeSet::new()
209            }
210            OverallStatus::Stalled { hints, .. } => hints.clone(),
211        }
212    }
213}
214
215impl<'a> From<&'a OverallStatus> for Status {
216    fn from(val: &'a OverallStatus) -> Self {
217        match val {
218            OverallStatus::Starting => Status::Starting,
219            OverallStatus::Running => Status::Running,
220            OverallStatus::Stalled { .. } => Status::Stalled,
221            OverallStatus::Ceased { .. } => Status::Ceased,
222        }
223    }
224}
225
226#[derive(Debug)]
227struct HealthState {
228    healths: PerWorkerHealthStatus,
229    last_reported_status: Option<OverallStatus>,
230    halt_with: Option<(StatusNamespace, HealthStatusUpdate)>,
231}
232
233impl HealthState {
234    fn new(worker_count: usize) -> HealthState {
235        HealthState {
236            healths: PerWorkerHealthStatus {
237                errors_by_worker: vec![Default::default(); worker_count],
238            },
239            last_reported_status: None,
240            halt_with: None,
241        }
242    }
243}
244
245/// A trait that lets a user configure the `health_operator` with custom
246/// behavior. This is mostly useful for testing, and the [`DefaultWriter`]
247/// should be the correct implementation for everyone.
248pub trait HealthOperator {
249    /// Record a new status.
250    fn record_new_status(
251        &self,
252        collection_id: GlobalId,
253        ts: DateTime<Utc>,
254        new_status: Status,
255        new_error: Option<&str>,
256        hints: &BTreeSet<String>,
257        namespaced_errors: &BTreeMap<StatusNamespace, String>,
258        // TODO(guswynn): not urgent:
259        // Ideally this would be entirely included in the `DefaultWriter`, but that
260        // requires a fairly heavy change to the `health_operator`, which hardcodes
261        // some use of persist. For now we just leave it and ignore it in tests.
262        write_namespaced_map: bool,
263    );
264    fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>);
265
266    /// Optionally override the chosen worker index. Default is semi-random.
267    /// Only useful for tests.
268    fn chosen_worker(&self) -> Option<usize> {
269        None
270    }
271}
272
273/// A default `HealthOperator` for use in normal cases.
274pub struct DefaultWriter {
275    pub command_tx: InternalCommandSender,
276    pub updates: Rc<RefCell<Vec<StatusUpdate>>>,
277}
278
279impl HealthOperator for DefaultWriter {
280    fn record_new_status(
281        &self,
282        collection_id: GlobalId,
283        ts: DateTime<Utc>,
284        status: Status,
285        new_error: Option<&str>,
286        hints: &BTreeSet<String>,
287        namespaced_errors: &BTreeMap<StatusNamespace, String>,
288        write_namespaced_map: bool,
289    ) {
290        self.updates.borrow_mut().push(StatusUpdate {
291            id: collection_id,
292            timestamp: ts,
293            status,
294            error: new_error.map(|e| e.to_string()),
295            hints: hints.clone(),
296            namespaced_errors: if write_namespaced_map {
297                namespaced_errors
298                    .iter()
299                    .map(|(ns, val)| (ns.to_string(), val.clone()))
300                    .collect()
301            } else {
302                BTreeMap::new()
303            },
304            replica_id: None,
305        });
306    }
307
308    fn send_halt(&self, id: GlobalId, error: Option<(StatusNamespace, HealthStatusUpdate)>) {
309        self.command_tx
310            .send(InternalStorageCommand::SuspendAndRestart {
311                // Suspend and restart is expected to operate on the primary object and
312                // not any of the sub-objects
313                id,
314                reason: format!("{:?}", error),
315            });
316    }
317}
318
319/// A health message consumed by the `health_operator`.
320#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
321pub struct HealthStatusMessage {
322    /// The object that this status message is about. When None, it refers to the entire ingestion
323    /// as a whole. When Some, it refers to a specific subsource.
324    pub id: Option<GlobalId>,
325    /// The namespace of the health update.
326    pub namespace: StatusNamespace,
327    /// The update itself.
328    pub update: HealthStatusUpdate,
329}
330
331/// Writes updates that come across `health_stream` to the collection's status shards, as identified
332/// by their `CollectionMetadata`.
333///
334/// Only one worker will be active and write to the status shard.
335///
336/// The `OutputIndex` values that come across `health_stream` must be a strict subset of those in
337/// `configs`'s keys.
338pub(crate) fn health_operator<G, P>(
339    scope: &G,
340    now: NowFn,
341    // A set of id's that should be marked as `HealthStatusUpdate::starting()` during startup.
342    mark_starting: BTreeSet<GlobalId>,
343    // An id that is allowed to halt the dataflow. Others are ignored, and panic during debug
344    // mode.
345    halting_id: GlobalId,
346    // A description of the object type we are writing status updates about. Used in log lines.
347    object_type: &'static str,
348    // An indexed stream of health updates. Indexes are configured in `configs`.
349    health_stream: &Stream<G, HealthStatusMessage>,
350    // An impl of `HealthOperator` that configures the output behavior of this operator.
351    health_operator_impl: P,
352    // Whether or not we should actually write namespaced errors in the `details` column.
353    write_namespaced_map: bool,
354    // How long to wait before initiating a `SuspendAndRestart` command, to
355    // prevent hot restart loops.
356    suspend_and_restart_delay: Duration,
357) -> PressOnDropButton
358where
359    G: Scope,
360    P: HealthOperator + 'static,
361{
362    // Derived config options
363    let healthcheck_worker_id = scope.index();
364    let worker_count = scope.peers();
365
366    // Inject the originating worker id to each item before exchanging to the chosen worker
367    let health_stream = health_stream.map(move |status| (healthcheck_worker_id, status));
368
369    let chosen_worker_id = if let Some(index) = health_operator_impl.chosen_worker() {
370        index
371    } else {
372        // We'll route all the work to a single arbitrary worker;
373        // there's not much to do, and we need a global view.
374        usize::cast_from(mark_starting.iter().next().hashed()) % worker_count
375    };
376
377    let is_active_worker = chosen_worker_id == healthcheck_worker_id;
378
379    let operator_name = format!("healthcheck({})", healthcheck_worker_id);
380    let mut health_op = AsyncOperatorBuilder::new(operator_name, scope.clone());
381
382    let mut input = health_op.new_disconnected_input(
383        &health_stream,
384        Exchange::new(move |_| u64::cast_from(chosen_worker_id)),
385    );
386
387    let button = health_op.build(move |mut _capabilities| async move {
388        let mut health_states: BTreeMap<_, _> = mark_starting
389            .iter()
390            .copied()
391            .chain([halting_id])
392            .map(|id| (id, HealthState::new(worker_count)))
393            .collect();
394
395        // Write the initial starting state to the status shard for all managed objects
396        if is_active_worker {
397            for (id, state) in health_states.iter_mut() {
398                let status = OverallStatus::Starting;
399                let timestamp = mz_ore::now::to_datetime(now());
400                health_operator_impl.record_new_status(
401                    *id,
402                    timestamp,
403                    (&status).into(),
404                    status.error(),
405                    &status.hints(),
406                    status.errors().unwrap_or(&BTreeMap::new()),
407                    write_namespaced_map,
408                );
409
410                state.last_reported_status = Some(status);
411            }
412        }
413
414        let mut outputs_seen = BTreeMap::<GlobalId, BTreeSet<_>>::new();
415        while let Some(event) = input.next().await {
416            if let AsyncEvent::Data(_cap, rows) = event {
417                for (worker_id, message) in rows {
418                    let HealthStatusMessage {
419                        id,
420                        namespace: ns,
421                        update: health_event,
422                    } = message;
423                    let id = id.unwrap_or(halting_id);
424                    let HealthState {
425                        healths, halt_with, ..
426                    } = match health_states.get_mut(&id) {
427                        Some(health) => health,
428                        // This is a health status update for a sub-object_type that we did not request to
429                        // be generated, which means it doesn't have a GlobalId and should not be
430                        // propagated to the shard.
431                        None => continue,
432                    };
433
434                    // Its important to track `new_round` per-namespace, so namespaces are reasoned
435                    // about in `merge_update` independently.
436                    let new_round = outputs_seen
437                        .entry(id)
438                        .or_insert_with(BTreeSet::new)
439                        .insert(ns.clone());
440
441                    if !is_active_worker {
442                        error!(
443                            "Health messages for {object_type} {id} passed to \
444                              an unexpected worker id: {healthcheck_worker_id}"
445                        )
446                    }
447
448                    if health_event.should_halt() {
449                        *halt_with = Some((ns.clone(), health_event.clone()));
450                    }
451
452                    healths.merge_update(worker_id, ns, health_event, !new_round);
453                }
454
455                let mut halt_with_outer = None;
456
457                while let Some((id, _)) = outputs_seen.pop_first() {
458                    let HealthState {
459                        healths,
460                        last_reported_status,
461                        halt_with,
462                    } = health_states.get_mut(&id).expect("known to exist");
463
464                    let new_status = healths.decide_status();
465
466                    if Some(&new_status) != last_reported_status.as_ref() {
467                        info!(
468                            "Health transition for {object_type} {id}: \
469                                  {last_reported_status:?} -> {:?}",
470                            Some(&new_status)
471                        );
472
473                        let timestamp = mz_ore::now::to_datetime(now());
474                        health_operator_impl.record_new_status(
475                            id,
476                            timestamp,
477                            (&new_status).into(),
478                            new_status.error(),
479                            &new_status.hints(),
480                            new_status.errors().unwrap_or(&BTreeMap::new()),
481                            write_namespaced_map,
482                        );
483
484                        *last_reported_status = Some(new_status.clone());
485                    }
486
487                    // Set halt with if None.
488                    if halt_with_outer.is_none() && halt_with.is_some() {
489                        halt_with_outer = Some((id, halt_with.clone()));
490                    }
491                }
492
493                // TODO(aljoscha): Instead of threading through the
494                // `should_halt` bit, we can give an internal command sender
495                // directly to the places where `should_halt = true` originates.
496                // We should definitely do that, but this is okay for a PoC.
497                if let Some((id, halt_with)) = halt_with_outer {
498                    mz_ore::soft_assert_or_log!(
499                        id == halting_id,
500                        "sub{object_type}s should not produce \
501                        halting errors, however {:?} halted while primary \
502                                            {object_type} is {:?}",
503                        id,
504                        halting_id
505                    );
506
507                    info!(
508                        "Broadcasting suspend-and-restart \
509                        command because of {:?} after {:?} delay",
510                        halt_with, suspend_and_restart_delay
511                    );
512                    tokio::time::sleep(suspend_and_restart_delay).await;
513                    health_operator_impl.send_halt(id, halt_with);
514                }
515            }
516        }
517    });
518
519    button.press_on_drop()
520}
521
522use serde::{Deserialize, Serialize};
523
524/// NB: we derive Ord here, so the enum order matters. Generally, statuses later in the list
525/// take precedence over earlier ones: so if one worker is stalled, we'll consider the entire
526/// source to be stalled.
527#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
528pub enum HealthStatusUpdate {
529    Running,
530    Stalled {
531        error: String,
532        hint: Option<String>,
533        should_halt: bool,
534    },
535    Ceased {
536        error: String,
537    },
538}
539
540impl HealthStatusUpdate {
541    /// Generates a running [`HealthStatusUpdate`].
542    pub(crate) fn running() -> Self {
543        HealthStatusUpdate::Running
544    }
545
546    /// Generates a non-halting [`HealthStatusUpdate`] with `update`.
547    pub(crate) fn stalled(error: String, hint: Option<String>) -> Self {
548        HealthStatusUpdate::Stalled {
549            error,
550            hint,
551            should_halt: false,
552        }
553    }
554
555    /// Generates a halting [`HealthStatusUpdate`] with `update`.
556    pub(crate) fn halting(error: String, hint: Option<String>) -> Self {
557        HealthStatusUpdate::Stalled {
558            error,
559            hint,
560            should_halt: true,
561        }
562    }
563
564    // TODO: redesign ceased status database-issues#7687
565    // Generates a ceasing [`HealthStatusUpdate`] with `update`.
566    // pub(crate) fn ceasing(error: String) -> Self {
567    //     HealthStatusUpdate::Ceased { error }
568    // }
569
570    /// Whether or not we should halt the dataflow instances and restart it.
571    pub(crate) fn should_halt(&self) -> bool {
572        match self {
573            HealthStatusUpdate::Running |
574            // HealthStatusUpdate::Ceased should never halt because it can occur
575            // at the subsource level and should not cause the entire dataflow
576            // to halt. Instead, the dataflow itself should handle shutting
577            // itself down if need be.
578            HealthStatusUpdate::Ceased { .. } => false,
579            HealthStatusUpdate::Stalled { should_halt, .. } => *should_halt,
580        }
581    }
582}
583
584#[cfg(test)]
585mod tests {
586    use super::*;
587    use itertools::Itertools;
588
589    // Actual timely tests for `health_operator`.
590
591    #[mz_ore::test]
592    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
593    fn test_health_operator_basic() {
594        use Step::*;
595
596        // Test 2 inputs across 2 workers.
597        health_operator_runner(
598            2,
599            2,
600            true,
601            vec![
602                AssertStatus(vec![
603                    // Assert both inputs started.
604                    StatusToAssert {
605                        collection_index: 0,
606                        status: Status::Starting,
607                        ..Default::default()
608                    },
609                    StatusToAssert {
610                        collection_index: 1,
611                        status: Status::Starting,
612                        ..Default::default()
613                    },
614                ]),
615                // Update and assert one is running.
616                Update(TestUpdate {
617                    worker_id: 1,
618                    namespace: StatusNamespace::Generator,
619                    id: None,
620                    update: HealthStatusUpdate::running(),
621                }),
622                AssertStatus(vec![StatusToAssert {
623                    collection_index: 0,
624                    status: Status::Running,
625                    ..Default::default()
626                }]),
627                // Assert the other can be stalled by 1 worker.
628                //
629                // TODO(guswynn): ideally we could push these updates
630                // at the same time, but because they are coming from separately
631                // workers, they could end up in different rounds, causing flakes.
632                // For now, we just do this.
633                Update(TestUpdate {
634                    worker_id: 1,
635                    namespace: StatusNamespace::Generator,
636                    id: Some(GlobalId::User(1)),
637                    update: HealthStatusUpdate::running(),
638                }),
639                AssertStatus(vec![StatusToAssert {
640                    collection_index: 1,
641                    status: Status::Running,
642                    ..Default::default()
643                }]),
644                Update(TestUpdate {
645                    worker_id: 0,
646                    namespace: StatusNamespace::Generator,
647                    id: Some(GlobalId::User(1)),
648                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
649                }),
650                AssertStatus(vec![StatusToAssert {
651                    collection_index: 1,
652                    status: Status::Stalled,
653                    error: Some("generator: uhoh".to_string()),
654                    errors: Some("generator: uhoh".to_string()),
655                    ..Default::default()
656                }]),
657                // And that it can recover.
658                Update(TestUpdate {
659                    worker_id: 0,
660                    namespace: StatusNamespace::Generator,
661                    id: Some(GlobalId::User(1)),
662                    update: HealthStatusUpdate::running(),
663                }),
664                AssertStatus(vec![StatusToAssert {
665                    collection_index: 1,
666                    status: Status::Running,
667                    ..Default::default()
668                }]),
669            ],
670        );
671    }
672
673    #[mz_ore::test]
674    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
675    fn test_health_operator_write_namespaced_map() {
676        use Step::*;
677
678        // Test 2 inputs across 2 workers.
679        health_operator_runner(
680            2,
681            2,
682            // testing this
683            false,
684            vec![
685                AssertStatus(vec![
686                    // Assert both inputs started.
687                    StatusToAssert {
688                        collection_index: 0,
689                        status: Status::Starting,
690                        ..Default::default()
691                    },
692                    StatusToAssert {
693                        collection_index: 1,
694                        status: Status::Starting,
695                        ..Default::default()
696                    },
697                ]),
698                Update(TestUpdate {
699                    worker_id: 0,
700                    namespace: StatusNamespace::Generator,
701                    id: Some(GlobalId::User(1)),
702                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
703                }),
704                AssertStatus(vec![StatusToAssert {
705                    collection_index: 1,
706                    status: Status::Stalled,
707                    error: Some("generator: uhoh".to_string()),
708                    errors: None,
709                    ..Default::default()
710                }]),
711            ],
712        )
713    }
714
715    #[mz_ore::test]
716    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
717    fn test_health_operator_namespaces() {
718        use Step::*;
719
720        // Test 2 inputs across 2 workers.
721        health_operator_runner(
722            2,
723            1,
724            true,
725            vec![
726                AssertStatus(vec![
727                    // Assert both inputs started.
728                    StatusToAssert {
729                        collection_index: 0,
730                        status: Status::Starting,
731                        ..Default::default()
732                    },
733                ]),
734                // Assert that we merge namespaced errors correctly.
735                //
736                // Note that these all happen on the same worker id.
737                Update(TestUpdate {
738                    worker_id: 0,
739                    namespace: StatusNamespace::Generator,
740                    id: None,
741                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
742                }),
743                AssertStatus(vec![StatusToAssert {
744                    collection_index: 0,
745                    status: Status::Stalled,
746                    error: Some("generator: uhoh".to_string()),
747                    errors: Some("generator: uhoh".to_string()),
748                    ..Default::default()
749                }]),
750                Update(TestUpdate {
751                    worker_id: 0,
752                    namespace: StatusNamespace::Kafka,
753                    id: None,
754                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
755                }),
756                AssertStatus(vec![StatusToAssert {
757                    collection_index: 0,
758                    status: Status::Stalled,
759                    error: Some("kafka: uhoh".to_string()),
760                    errors: Some("generator: uhoh, kafka: uhoh".to_string()),
761                    ..Default::default()
762                }]),
763                // And that it can recover.
764                Update(TestUpdate {
765                    worker_id: 0,
766                    namespace: StatusNamespace::Kafka,
767                    id: None,
768                    update: HealthStatusUpdate::running(),
769                }),
770                AssertStatus(vec![StatusToAssert {
771                    collection_index: 0,
772                    status: Status::Stalled,
773                    error: Some("generator: uhoh".to_string()),
774                    errors: Some("generator: uhoh".to_string()),
775                    ..Default::default()
776                }]),
777                Update(TestUpdate {
778                    worker_id: 0,
779                    namespace: StatusNamespace::Generator,
780                    id: None,
781                    update: HealthStatusUpdate::running(),
782                }),
783                AssertStatus(vec![StatusToAssert {
784                    collection_index: 0,
785                    status: Status::Running,
786                    ..Default::default()
787                }]),
788            ],
789        );
790    }
791
792    #[mz_ore::test]
793    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
794    fn test_health_operator_namespace_side_channel() {
795        use Step::*;
796
797        health_operator_runner(
798            2,
799            1,
800            true,
801            vec![
802                AssertStatus(vec![
803                    // Assert both inputs started.
804                    StatusToAssert {
805                        collection_index: 0,
806                        status: Status::Starting,
807                        ..Default::default()
808                    },
809                ]),
810                // Assert that sidechannel namespaces don't downgrade the status
811                //
812                // Note that these all happen on the same worker id.
813                Update(TestUpdate {
814                    worker_id: 0,
815                    namespace: StatusNamespace::Ssh,
816                    id: None,
817                    update: HealthStatusUpdate::stalled("uhoh".to_string(), None),
818                }),
819                AssertStatus(vec![StatusToAssert {
820                    collection_index: 0,
821                    status: Status::Stalled,
822                    error: Some("ssh: uhoh".to_string()),
823                    errors: Some("ssh: uhoh".to_string()),
824                    ..Default::default()
825                }]),
826                Update(TestUpdate {
827                    worker_id: 0,
828                    namespace: StatusNamespace::Ssh,
829                    id: None,
830                    update: HealthStatusUpdate::stalled("uhoh2".to_string(), None),
831                }),
832                AssertStatus(vec![StatusToAssert {
833                    collection_index: 0,
834                    status: Status::Stalled,
835                    error: Some("ssh: uhoh2".to_string()),
836                    errors: Some("ssh: uhoh2".to_string()),
837                    ..Default::default()
838                }]),
839                Update(TestUpdate {
840                    worker_id: 0,
841                    namespace: StatusNamespace::Ssh,
842                    id: None,
843                    update: HealthStatusUpdate::running(),
844                }),
845                // We haven't starting running yet, as a `Default` namespace hasn't told us.
846                AssertStatus(vec![StatusToAssert {
847                    collection_index: 0,
848                    status: Status::Starting,
849                    ..Default::default()
850                }]),
851                Update(TestUpdate {
852                    worker_id: 0,
853                    namespace: StatusNamespace::Generator,
854                    id: None,
855                    update: HealthStatusUpdate::running(),
856                }),
857                AssertStatus(vec![StatusToAssert {
858                    collection_index: 0,
859                    status: Status::Running,
860                    ..Default::default()
861                }]),
862            ],
863        );
864    }
865
866    #[mz_ore::test]
867    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
868    fn test_health_operator_hints() {
869        use Step::*;
870
871        health_operator_runner(
872            2,
873            1,
874            true,
875            vec![
876                AssertStatus(vec![
877                    // Assert both inputs started.
878                    StatusToAssert {
879                        collection_index: 0,
880                        status: Status::Starting,
881                        ..Default::default()
882                    },
883                ]),
884                // Note that these all happen across worker ids.
885                Update(TestUpdate {
886                    worker_id: 0,
887                    namespace: StatusNamespace::Generator,
888                    id: None,
889                    update: HealthStatusUpdate::stalled(
890                        "uhoh".to_string(),
891                        Some("hint1".to_string()),
892                    ),
893                }),
894                AssertStatus(vec![StatusToAssert {
895                    collection_index: 0,
896                    status: Status::Stalled,
897                    error: Some("generator: uhoh".to_string()),
898                    errors: Some("generator: uhoh".to_string()),
899                    hint: Some("hint1".to_string()),
900                }]),
901                Update(TestUpdate {
902                    worker_id: 1,
903                    namespace: StatusNamespace::Generator,
904                    id: None,
905                    update: HealthStatusUpdate::stalled(
906                        "uhoh2".to_string(),
907                        Some("hint2".to_string()),
908                    ),
909                }),
910                AssertStatus(vec![StatusToAssert {
911                    collection_index: 0,
912                    status: Status::Stalled,
913                    // Note the error sorts later so we just use that.
914                    error: Some("generator: uhoh2".to_string()),
915                    errors: Some("generator: uhoh2".to_string()),
916                    hint: Some("hint1, hint2".to_string()),
917                }]),
918                // Update one of the hints
919                Update(TestUpdate {
920                    worker_id: 1,
921                    namespace: StatusNamespace::Generator,
922                    id: None,
923                    update: HealthStatusUpdate::stalled(
924                        "uhoh2".to_string(),
925                        Some("hint3".to_string()),
926                    ),
927                }),
928                AssertStatus(vec![StatusToAssert {
929                    collection_index: 0,
930                    status: Status::Stalled,
931                    // Note the error sorts later so we just use that.
932                    error: Some("generator: uhoh2".to_string()),
933                    errors: Some("generator: uhoh2".to_string()),
934                    hint: Some("hint1, hint3".to_string()),
935                }]),
936                // Assert recovery.
937                Update(TestUpdate {
938                    worker_id: 0,
939                    namespace: StatusNamespace::Generator,
940                    id: None,
941                    update: HealthStatusUpdate::running(),
942                }),
943                AssertStatus(vec![StatusToAssert {
944                    collection_index: 0,
945                    status: Status::Stalled,
946                    // Note the error sorts later so we just use that.
947                    error: Some("generator: uhoh2".to_string()),
948                    errors: Some("generator: uhoh2".to_string()),
949                    hint: Some("hint3".to_string()),
950                }]),
951                Update(TestUpdate {
952                    worker_id: 1,
953                    namespace: StatusNamespace::Generator,
954                    id: None,
955                    update: HealthStatusUpdate::running(),
956                }),
957                AssertStatus(vec![StatusToAssert {
958                    collection_index: 0,
959                    status: Status::Running,
960                    ..Default::default()
961                }]),
962            ],
963        );
964    }
965
966    // The below is ALL test infrastructure for the above
967
968    use mz_ore::assert_err;
969    use timely::container::CapacityContainerBuilder;
970    use timely::dataflow::Scope;
971    use timely::dataflow::operators::Enter;
972    use timely::dataflow::operators::exchange::Exchange;
973    use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
974
975    /// A status to assert.
976    #[derive(Debug, Clone, PartialEq, Eq)]
977    struct StatusToAssert {
978        collection_index: usize,
979        status: Status,
980        error: Option<String>,
981        errors: Option<String>,
982        hint: Option<String>,
983    }
984
985    impl Default for StatusToAssert {
986        fn default() -> Self {
987            StatusToAssert {
988                collection_index: Default::default(),
989                status: Status::Running,
990                error: Default::default(),
991                errors: Default::default(),
992                hint: Default::default(),
993            }
994        }
995    }
996
997    /// An update to push into the operator.
998    /// Can come from any worker, and from any input.
999    #[derive(Debug, Clone)]
1000    struct TestUpdate {
1001        worker_id: u64,
1002        namespace: StatusNamespace,
1003        id: Option<GlobalId>,
1004        update: HealthStatusUpdate,
1005    }
1006
1007    #[derive(Debug, Clone)]
1008    enum Step {
1009        /// Insert a new health update.
1010        Update(TestUpdate),
1011        /// Assert a set of outputs. Note that these should
1012        /// have unique `collection_index`'s
1013        AssertStatus(Vec<StatusToAssert>),
1014    }
1015
1016    struct TestWriter {
1017        sender: UnboundedSender<StatusToAssert>,
1018        input_mapping: BTreeMap<GlobalId, usize>,
1019    }
1020
1021    impl HealthOperator for TestWriter {
1022        fn record_new_status(
1023            &self,
1024            collection_id: GlobalId,
1025            _ts: DateTime<Utc>,
1026            status: Status,
1027            new_error: Option<&str>,
1028            hints: &BTreeSet<String>,
1029            namespaced_errors: &BTreeMap<StatusNamespace, String>,
1030            write_namespaced_map: bool,
1031        ) {
1032            let _ = self.sender.send(StatusToAssert {
1033                collection_index: *self.input_mapping.get(&collection_id).unwrap(),
1034                status,
1035                error: new_error.map(str::to_string),
1036                errors: if !namespaced_errors.is_empty() && write_namespaced_map {
1037                    Some(
1038                        namespaced_errors
1039                            .iter()
1040                            .map(|(ns, err)| format!("{}: {}", ns, err))
1041                            .join(", "),
1042                    )
1043                } else {
1044                    None
1045                },
1046                hint: if !hints.is_empty() {
1047                    Some(hints.iter().join(", "))
1048                } else {
1049                    None
1050                },
1051            });
1052        }
1053
1054        fn send_halt(&self, _id: GlobalId, _error: Option<(StatusNamespace, HealthStatusUpdate)>) {
1055            // Not yet unit-tested
1056            unimplemented!()
1057        }
1058
1059        fn chosen_worker(&self) -> Option<usize> {
1060            // We input and assert outputs on the first worker.
1061            Some(0)
1062        }
1063    }
1064
1065    /// Setup a `health_operator` with a set number of workers and inputs, and the
1066    /// steps on the first worker.
1067    fn health_operator_runner(
1068        workers: usize,
1069        inputs: usize,
1070        write_namespaced_map: bool,
1071        steps: Vec<Step>,
1072    ) {
1073        let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
1074        let tokio_handle = tokio_runtime.handle().clone();
1075
1076        let inputs: BTreeMap<GlobalId, usize> = (0..inputs)
1077            .map(|index| (GlobalId::User(u64::cast_from(index)), index))
1078            .collect();
1079
1080        timely::execute::execute(
1081            timely::execute::Config {
1082                communication: timely::CommunicationConfig::Process(workers),
1083                worker: Default::default(),
1084            },
1085            move |worker| {
1086                let steps = steps.clone();
1087                let inputs = inputs.clone();
1088
1089                let _tokio_guard = tokio_handle.enter();
1090                let (in_tx, in_rx) = unbounded_channel();
1091                let (out_tx, mut out_rx) = unbounded_channel();
1092
1093                worker.dataflow::<(), _, _>(|root_scope| {
1094                    root_scope
1095                        .clone()
1096                        .scoped::<mz_repr::Timestamp, _, _>("gus", |scope| {
1097                            let input = producer(root_scope.clone(), in_rx).enter(scope);
1098                            Box::leak(Box::new(health_operator(
1099                                scope,
1100                                mz_ore::now::SYSTEM_TIME.clone(),
1101                                inputs.keys().copied().collect(),
1102                                *inputs.first_key_value().unwrap().0,
1103                                "source_test",
1104                                &input,
1105                                TestWriter {
1106                                    sender: out_tx,
1107                                    input_mapping: inputs,
1108                                },
1109                                write_namespaced_map,
1110                                Duration::from_secs(5),
1111                            )));
1112                        });
1113                });
1114
1115                // We arbitrarily do all the testing on the first worker.
1116                if worker.index() == 0 {
1117                    use Step::*;
1118                    for step in steps {
1119                        match step {
1120                            Update(update) => {
1121                                let _ = in_tx.send(update);
1122                            }
1123                            AssertStatus(mut statuses) => loop {
1124                                match out_rx.try_recv() {
1125                                    Err(_) => {
1126                                        worker.step();
1127                                        // This makes testing easier.
1128                                        std::thread::sleep(std::time::Duration::from_millis(50));
1129                                    }
1130                                    Ok(update) => {
1131                                        let pos = statuses
1132                                            .iter()
1133                                            .position(|s| {
1134                                                s.collection_index == update.collection_index
1135                                            })
1136                                            .unwrap();
1137
1138                                        let status_to_assert = &statuses[pos];
1139                                        assert_eq!(&update, status_to_assert);
1140
1141                                        statuses.remove(pos);
1142                                        if statuses.is_empty() {
1143                                            break;
1144                                        }
1145                                    }
1146                                }
1147                            },
1148                        }
1149                    }
1150
1151                    // Assert that nothing is left in the channel.
1152                    assert_err!(out_rx.try_recv());
1153                }
1154            },
1155        )
1156        .unwrap();
1157    }
1158
1159    /// Produces (input_index, HealthStatusUpdate)'s based on the input channel.
1160    ///
1161    /// Only the first worker is used, all others immediately drop their capabilities and channels.
1162    /// After the channel is empty on the first worker, then the frontier will go to [].
1163    /// Also ensures that updates are routed to the correct worker based on the `TestUpdate`
1164    /// using an exchange.
1165    fn producer<G: Scope<Timestamp = ()>>(
1166        scope: G,
1167        mut input: UnboundedReceiver<TestUpdate>,
1168    ) -> Stream<G, HealthStatusMessage> {
1169        let mut iterator = AsyncOperatorBuilder::new("iterator".to_string(), scope.clone());
1170        let (output_handle, output) = iterator.new_output::<CapacityContainerBuilder<Vec<_>>>();
1171
1172        let index = scope.index();
1173        iterator.build(|mut caps| async move {
1174            // We input and assert outputs on the first worker.
1175            if index != 0 {
1176                return;
1177            }
1178            let mut capability = Some(caps.pop().unwrap());
1179            while let Some(element) = input.recv().await {
1180                output_handle.give(
1181                    capability.as_ref().unwrap(),
1182                    (
1183                        element.worker_id,
1184                        element.id,
1185                        element.namespace,
1186                        element.update,
1187                    ),
1188                );
1189            }
1190
1191            capability.take();
1192        });
1193
1194        let output = output.exchange(|d| d.0).map(|d| HealthStatusMessage {
1195            id: d.1,
1196            namespace: d.2,
1197            update: d.3,
1198        });
1199
1200        output
1201    }
1202}