Skip to main content

mz_compute/
command_channel.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A channel for broadcasting compute commands from worker 0 to other workers.
11//!
12//! Compute uses a dataflow to distribute commands between workers. This is to ensure workers
13//! retain a consistent dataflow state across reconnects. If each worker would receive its commands
14//! directly from the controller, there wouldn't be any guarantee that after a reconnect all
15//! workers have seen the same sequence of commands. This is particularly problematic for
16//! `CreateDataflow` commands, since Timely requires that all workers render the same dataflows in
17//! the same order. So the controller instead sends commands only to worker 0, which then
18//! broadcasts them to other workers through the Timely fabric, taking care of the correct
19//! sequencing.
20//!
21//! Commands in the command channel are tagged with a nonce identifying the incarnation of the
22//! compute protocol the command belongs to, allowing workers to recognize client reconnects that
23//! require a reconciliation.
24
25use std::sync::mpsc::{self, TryRecvError};
26use std::sync::{Arc, Mutex};
27
28use itertools::Itertools;
29use mz_compute_client::protocol::command::ComputeCommand;
30use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
31use mz_ore::cast::CastFrom;
32use mz_timely_util::scope_label::ScopeExt;
33use timely::dataflow::channels::pact::Exchange;
34use timely::dataflow::operators::Operator;
35use timely::dataflow::operators::generic::source;
36use timely::scheduling::SyncActivator;
37use timely::worker::Worker as TimelyWorker;
38use uuid::Uuid;
39
40/// A sender pushing commands onto the command channel.
41pub struct Sender {
42    tx: mpsc::Sender<(ComputeCommand, Uuid)>,
43    activator: Arc<Mutex<Option<SyncActivator>>>,
44}
45
46impl Sender {
47    /// Broadcasts the given command to all workers.
48    pub fn send(&self, message: (ComputeCommand, Uuid)) {
49        if self.tx.send(message).is_err() {
50            unreachable!("command channel never shuts down");
51        }
52
53        self.activator
54            .lock()
55            .expect("poisoned")
56            .as_ref()
57            .map(|a| a.activate());
58    }
59}
60
61/// A receiver reading commands from the command channel.
62pub struct Receiver {
63    rx: mpsc::Receiver<(ComputeCommand, Uuid)>,
64}
65
66impl Receiver {
67    /// Returns the next available command, if any.
68    ///
69    /// This returns `None` when there are currently no commands but there might be commands again
70    /// in the future.
71    pub fn try_recv(&self) -> Option<(ComputeCommand, Uuid)> {
72        match self.rx.try_recv() {
73            Ok(msg) => Some(msg),
74            Err(TryRecvError::Empty) => None,
75            Err(TryRecvError::Disconnected) => {
76                unreachable!("command channel never shuts down");
77            }
78        }
79    }
80}
81
82/// Render the command channel dataflow.
83pub fn render(timely_worker: &mut TimelyWorker) -> (Sender, Receiver) {
84    let (input_tx, input_rx) = mpsc::channel();
85    let (output_tx, output_rx) = mpsc::channel();
86    let activator = Arc::new(Mutex::new(None));
87
88    // TODO(teskje): This implementation relies on Timely channels preserving the order of their
89    // inputs, which is not something they guarantee. We can avoid that by using explicit indexing,
90    // like storage's command sequencer does.
91    timely_worker.dataflow_named::<u64, _, _>("command_channel", {
92        let activator = Arc::clone(&activator);
93        move |scope| {
94            let scope = scope.with_label();
95
96            source(scope, "command_channel::source", |cap, info| {
97                let sync_activator = scope.worker().sync_activator_for(info.address.to_vec());
98                *activator.lock().expect("poisoned") = Some(sync_activator);
99
100                let worker_id = scope.index();
101                let peers = scope.peers();
102
103                // Only worker 0 broadcasts commands, other workers must drop their capability to
104                // avoid holding up dataflow progress.
105                let mut capability = (worker_id == 0).then_some(cap);
106
107                move |output| {
108                    let Some(cap) = &mut capability else {
109                        // Non-leader workers will still receive `UpdateConfiguration` commands and
110                        // we must drain those to not leak memory.
111                        while let Ok((cmd, _nonce)) = input_rx.try_recv() {
112                            assert_ne!(worker_id, 0);
113                            assert!(matches!(cmd, ComputeCommand::UpdateConfiguration(_)));
114                        }
115                        return;
116                    };
117
118                    assert_eq!(worker_id, 0);
119
120                    let input: Vec<_> = input_rx.try_iter().collect();
121                    for (cmd, nonce) in input {
122                        let worker_cmds =
123                            split_command(cmd, peers).map(|(idx, cmd)| (idx, cmd, nonce));
124                        output.session(&cap).give_iterator(worker_cmds);
125
126                        cap.downgrade(&(cap.time() + 1));
127                    }
128                }
129            })
130            .sink(
131                Exchange::new(|(idx, _, _)| u64::cast_from(*idx)),
132                "command_channel::sink",
133                move |(input, _)| {
134                    input.for_each(|_time, data| {
135                        for (_idx, cmd, nonce) in data.drain(..) {
136                            let _ = output_tx.send((cmd, nonce));
137                        }
138                    });
139                },
140            );
141        }
142    });
143
144    let tx = Sender {
145        tx: input_tx,
146        activator,
147    };
148    let rx = Receiver { rx: output_rx };
149
150    (tx, rx)
151}
152
153/// Split the given command into the given number of parts.
154///
155/// Returns an iterator that produces each command part, along with its part index.
156fn split_command(
157    command: ComputeCommand,
158    parts: usize,
159) -> impl Iterator<Item = (usize, ComputeCommand)> {
160    use itertools::Either;
161
162    let commands = match command {
163        ComputeCommand::CreateDataflow(dataflow) => {
164            let dataflow = *dataflow;
165
166            // A list of descriptions of objects for each part to build.
167            let mut builds_parts = vec![Vec::new(); parts];
168            // Partition each build description among `parts`.
169            for build_desc in dataflow.objects_to_build {
170                let build_part = build_desc.plan.partition_among(parts);
171                for (plan, objects_to_build) in
172                    build_part.into_iter().zip_eq(builds_parts.iter_mut())
173                {
174                    objects_to_build.push(BuildDesc {
175                        id: build_desc.id,
176                        plan,
177                    });
178                }
179            }
180
181            // Each list of build descriptions results in a dataflow description.
182            let commands = builds_parts
183                .into_iter()
184                .map(move |objects_to_build| DataflowDescription {
185                    source_imports: dataflow.source_imports.clone(),
186                    index_imports: dataflow.index_imports.clone(),
187                    objects_to_build,
188                    index_exports: dataflow.index_exports.clone(),
189                    sink_exports: dataflow.sink_exports.clone(),
190                    as_of: dataflow.as_of.clone(),
191                    until: dataflow.until.clone(),
192                    debug_name: dataflow.debug_name.clone(),
193                    initial_storage_as_of: dataflow.initial_storage_as_of.clone(),
194                    refresh_schedule: dataflow.refresh_schedule.clone(),
195                    time_dependence: dataflow.time_dependence.clone(),
196                })
197                .map(Box::new)
198                .map(ComputeCommand::CreateDataflow);
199            Either::Left(commands)
200        }
201        command => {
202            let commands = std::iter::repeat_n(command, parts);
203            Either::Right(commands)
204        }
205    };
206
207    commands.into_iter().enumerate()
208}