mz_compute/
command_channel.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A channel for broadcasting compute commands from worker 0 to other workers.
11//!
12//! Compute uses a dataflow to distribute commands between workers. This is to ensure workers
13//! retain a consistent dataflow state across reconnects. If each worker would receive its commands
14//! directly from the controller, there wouldn't be any guarantee that after a reconnect all
15//! workers have seen the same sequence of commands. This is particularly problematic for
16//! `CreateDataflow` commands, since Timely requires that all workers render the same dataflows in
17//! the same order. So the controller instead sends commands only to worker 0, which then
18//! broadcasts them to other workers through the Timely fabric, taking care of the correct
19//! sequencing.
20//!
21//! Commands in the command channel are tagged with an epoch identifying the incarnation of the
22//! compute protocol the command belongs to, allowing workers to recognize client reconnects that
23//! require a reconciliation.
24
25use std::sync::{Arc, Mutex};
26
27use crossbeam_channel::TryRecvError;
28use mz_compute_client::protocol::command::ComputeCommand;
29use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
30use mz_ore::cast::CastFrom;
31use timely::communication::Allocate;
32use timely::dataflow::channels::pact::Exchange;
33use timely::dataflow::operators::Operator;
34use timely::dataflow::operators::generic::source;
35use timely::scheduling::{Scheduler, SyncActivator};
36use timely::worker::Worker as TimelyWorker;
37
38/// A sender pushing commands onto the command channel.
39pub struct Sender {
40    tx: crossbeam_channel::Sender<(ComputeCommand, u64)>,
41    activator: Arc<Mutex<Option<SyncActivator>>>,
42}
43
44impl Sender {
45    /// Broadcasts the given command to all workers.
46    pub fn send(&self, message: (ComputeCommand, u64)) {
47        if self.tx.send(message).is_err() {
48            unreachable!("command channel never shuts down");
49        }
50
51        self.activator
52            .lock()
53            .expect("poisoned")
54            .as_ref()
55            .map(|a| a.activate());
56    }
57}
58
59/// A receiver reading commands from the command channel.
60pub struct Receiver {
61    rx: crossbeam_channel::Receiver<(ComputeCommand, u64)>,
62}
63
64impl Receiver {
65    /// Returns the next available command, if any.
66    ///
67    /// This returns `None` when there are currently no commands but there might be commands again
68    /// in the future.
69    pub fn try_recv(&self) -> Option<(ComputeCommand, u64)> {
70        match self.rx.try_recv() {
71            Ok(msg) => Some(msg),
72            Err(TryRecvError::Empty) => None,
73            Err(TryRecvError::Disconnected) => {
74                unreachable!("command channel never shuts down");
75            }
76        }
77    }
78}
79
80/// Render the command channel dataflow.
81pub fn render<A: Allocate>(timely_worker: &mut TimelyWorker<A>) -> (Sender, Receiver) {
82    let (input_tx, input_rx) = crossbeam_channel::unbounded();
83    let (output_tx, output_rx) = crossbeam_channel::unbounded();
84    let activator = Arc::new(Mutex::new(None));
85
86    // TODO(teskje): This implementation relies on Timely channels preserving the order of their
87    // inputs, which is not something they guarantee. We can avoid that by using explicit indexing,
88    // like storage's command sequencer does.
89    timely_worker.dataflow_named::<u64, _, _>("command_channel", {
90        let activator = Arc::clone(&activator);
91        move |scope| {
92            source(scope, "command_channel::source", |cap, info| {
93                let sync_activator = scope.sync_activator_for(info.address.to_vec());
94                *activator.lock().expect("poisoned") = Some(sync_activator);
95
96                let worker_id = scope.index();
97                let peers = scope.peers();
98
99                // Only worker 0 broadcasts commands, other workers must drop their capability to
100                // avoid holding up dataflow progress.
101                let mut capability = (worker_id == 0).then_some(cap);
102
103                move |output| {
104                    let Some(cap) = &mut capability else {
105                        // Non-leader workers will still receive `UpdateConfiguration` commands and
106                        // we must drain those to not leak memory.
107                        while let Ok((cmd, _epoch)) = input_rx.try_recv() {
108                            assert_ne!(worker_id, 0);
109                            assert!(matches!(cmd, ComputeCommand::UpdateConfiguration(_)));
110                        }
111                        return;
112                    };
113
114                    assert_eq!(worker_id, 0);
115
116                    let input: Vec<_> = input_rx.try_iter().collect();
117                    for (cmd, epoch) in input {
118                        let worker_cmds =
119                            split_command(cmd, peers).map(|(idx, cmd)| (idx, cmd, epoch));
120                        output.session(&cap).give_iterator(worker_cmds);
121
122                        cap.downgrade(&(cap.time() + 1));
123                    }
124                }
125            })
126            .sink(
127                Exchange::new(|(idx, _, _)| u64::cast_from(*idx)),
128                "command_channel::sink",
129                move |input| {
130                    while let Some((_cap, data)) = input.next() {
131                        for (_idx, cmd, epoch) in data.drain(..) {
132                            let _ = output_tx.send((cmd, epoch));
133                        }
134                    }
135                },
136            );
137        }
138    });
139
140    let tx = Sender {
141        tx: input_tx,
142        activator,
143    };
144    let rx = Receiver { rx: output_rx };
145
146    (tx, rx)
147}
148
149/// Split the given command into the given number of parts.
150///
151/// Returns an iterator that produces each command part, along with its part index.
152fn split_command(
153    command: ComputeCommand,
154    parts: usize,
155) -> impl Iterator<Item = (usize, ComputeCommand)> {
156    use itertools::Either;
157
158    let commands = match command {
159        ComputeCommand::CreateDataflow(dataflow) => {
160            // A list of descriptions of objects for each part to build.
161            let mut builds_parts = vec![Vec::new(); parts];
162            // Partition each build description among `parts`.
163            for build_desc in dataflow.objects_to_build {
164                let build_part = build_desc.plan.partition_among(parts);
165                for (plan, objects_to_build) in build_part.into_iter().zip(builds_parts.iter_mut())
166                {
167                    objects_to_build.push(BuildDesc {
168                        id: build_desc.id,
169                        plan,
170                    });
171                }
172            }
173
174            // Each list of build descriptions results in a dataflow description.
175            let commands = builds_parts
176                .into_iter()
177                .map(move |objects_to_build| DataflowDescription {
178                    source_imports: dataflow.source_imports.clone(),
179                    index_imports: dataflow.index_imports.clone(),
180                    objects_to_build,
181                    index_exports: dataflow.index_exports.clone(),
182                    sink_exports: dataflow.sink_exports.clone(),
183                    as_of: dataflow.as_of.clone(),
184                    until: dataflow.until.clone(),
185                    debug_name: dataflow.debug_name.clone(),
186                    initial_storage_as_of: dataflow.initial_storage_as_of.clone(),
187                    refresh_schedule: dataflow.refresh_schedule.clone(),
188                    time_dependence: dataflow.time_dependence.clone(),
189                })
190                .map(ComputeCommand::CreateDataflow);
191            Either::Left(commands)
192        }
193        command => {
194            let commands = std::iter::repeat_n(command, parts);
195            Either::Right(commands)
196        }
197    };
198
199    commands.into_iter().enumerate()
200}