1use std::sync::{Arc, Mutex};
13use std::time::Duration;
14
15use mz_cluster::client::{ClusterClient, ClusterSpec};
16use mz_cluster_client::client::TimelyConfig;
17use mz_ore::metrics::MetricsRegistry;
18use mz_ore::now::NowFn;
19use mz_ore::tracing::TracingHandle;
20use mz_persist_client::cache::PersistClientCache;
21use mz_rocksdb::config::SharedWriteBufferManager;
22use mz_storage_client::client::{StorageClient, StorageCommand, StorageResponse};
23use mz_storage_types::connections::ConnectionContext;
24use mz_timely_util::capture::EventLink;
25use mz_txn_wal::operator::TxnsContext;
26use timely::logging::{
27 ChannelsEvent, MessagesEvent, OperatesEvent, ScheduleEvent, ShutdownEvent, TimelyEvent,
28};
29use timely::worker::Worker as TimelyWorker;
30use tokio::sync::mpsc;
31use uuid::Uuid;
32
33use crate::metrics::StorageMetrics;
34use crate::storage_state::{StorageInstanceContext, Worker};
35
36#[derive(Clone)]
38struct Config {
39 pub persist_clients: Arc<PersistClientCache>,
41 pub txns_ctx: TxnsContext,
43 pub tracing_handle: Arc<TracingHandle>,
45 pub now: NowFn,
47 pub connection_context: ConnectionContext,
49 pub instance_context: StorageInstanceContext,
51
52 pub metrics: StorageMetrics,
54 pub shared_rocksdb_write_buffer_manager: SharedWriteBufferManager,
56 pub workers_per_process: usize,
58 pub timely_log_writers: Arc<Mutex<Vec<Option<TimelyLogWriter>>>>,
61}
62
63pub(crate) type TimelyLogWriter = Arc<EventLink<mz_repr::Timestamp, Vec<(Duration, TimelyEvent)>>>;
65
66pub async fn serve(
68 timely_config: TimelyConfig,
69 metrics_registry: &MetricsRegistry,
70 persist_clients: Arc<PersistClientCache>,
71 txns_ctx: TxnsContext,
72 tracing_handle: Arc<TracingHandle>,
73 now: NowFn,
74 connection_context: ConnectionContext,
75 instance_context: StorageInstanceContext,
76 timely_log_writers: Vec<TimelyLogWriter>,
77) -> Result<impl Fn() -> Box<dyn StorageClient> + use<>, anyhow::Error> {
78 let workers_per_process = timely_config.workers;
79 let timely_log_writers = if timely_log_writers.is_empty() {
83 (0..workers_per_process).map(|_| None).collect()
84 } else {
85 assert_eq!(timely_log_writers.len(), workers_per_process);
86 timely_log_writers.into_iter().map(Some).collect()
87 };
88 let config = Config {
89 persist_clients,
90 txns_ctx,
91 tracing_handle,
92 now,
93 connection_context,
94 instance_context,
95 metrics: StorageMetrics::register_with(metrics_registry),
96 shared_rocksdb_write_buffer_manager: Default::default(),
100 workers_per_process,
101 timely_log_writers: Arc::new(Mutex::new(timely_log_writers)),
102 };
103 let tokio_executor = tokio::runtime::Handle::current();
104
105 let timely_container = config.build_cluster(timely_config, tokio_executor).await?;
106 let timely_container = Arc::new(Mutex::new(timely_container));
107
108 let client_builder = move || {
109 let client = ClusterClient::new(Arc::clone(&timely_container));
110 let client: Box<dyn StorageClient> = Box::new(client);
111 client
112 };
113
114 Ok(client_builder)
115}
116
117impl ClusterSpec for Config {
118 type Command = StorageCommand;
119 type Response = StorageResponse;
120
121 const NAME: &str = "storage";
122
123 fn run_worker(
124 &self,
125 timely_worker: &mut TimelyWorker,
126 client_rx: mpsc::UnboundedReceiver<(
127 Uuid,
128 mpsc::UnboundedReceiver<StorageCommand>,
129 mpsc::UnboundedSender<StorageResponse>,
130 )>,
131 ) {
132 let local_index = timely_worker.index() % self.workers_per_process;
135 let writer = self.timely_log_writers.lock().unwrap()[local_index].take();
136 if let Some(writer) = writer {
137 use timely::dataflow::operators::capture::{Event, EventPusher};
138 use timely::logging::TimelyEventBuilder;
139
140 let interval_ms = 1000u128; let mut time_ms = mz_repr::Timestamp::from(0u64);
145 let mut event_pusher = writer;
146 let now = std::time::Instant::now();
147 let start_offset = std::time::SystemTime::now()
148 .duration_since(std::time::SystemTime::UNIX_EPOCH)
149 .expect("Failed to get duration since Unix epoch");
150
151 let logger = timely::logging_core::Logger::<TimelyEventBuilder>::new(
152 now,
153 start_offset,
154 move |time: &std::time::Duration,
155 data: &mut Option<Vec<(std::time::Duration, TimelyEvent)>>| {
156 if let Some(mut data) = data.take() {
157 data.retain_mut(|(_, event)| {
163 if matches!(event, TimelyEvent::Park(_)) {
164 return false;
165 }
166 remap_timely_event_ids(event);
167 true
168 });
169 event_pusher.push(Event::Messages(time_ms, data));
170 } else {
171 let new_time_ms: u64 = (((time.as_millis() / interval_ms) + 1)
173 * interval_ms)
174 .try_into()
175 .expect("must fit");
176 let new_time_ms = mz_repr::Timestamp::from(new_time_ms);
177 if time_ms < new_time_ms {
178 event_pusher
179 .push(Event::Progress(vec![(new_time_ms, 1), (time_ms, -1)]));
180 time_ms = new_time_ms;
181 }
182 }
183 },
184 );
185
186 if let Some(mut register) = timely_worker.log_register() {
187 register.insert_logger("timely", logger);
188 }
189 }
190
191 Worker::new(
192 timely_worker,
193 client_rx,
194 self.metrics.clone(),
195 self.now.clone(),
196 self.connection_context.clone(),
197 self.instance_context.clone(),
198 Arc::clone(&self.persist_clients),
199 self.txns_ctx.clone(),
200 Arc::clone(&self.tracing_handle),
201 self.shared_rocksdb_write_buffer_manager.clone(),
202 )
203 .run();
204 }
205}
206
207const STORAGE_ID_OFFSET: usize = 1 << 48;
212
213fn remap_timely_event_ids(event: &mut TimelyEvent) {
216 match event {
217 TimelyEvent::Operates(OperatesEvent { id, addr, .. }) => {
218 *id = id.wrapping_add(STORAGE_ID_OFFSET);
219 if let Some(first) = addr.first_mut() {
220 *first = first.wrapping_add(STORAGE_ID_OFFSET);
221 }
222 }
223 TimelyEvent::Channels(ChannelsEvent {
224 id,
225 scope_addr,
226 source,
227 target,
228 ..
229 }) => {
230 *id = id.wrapping_add(STORAGE_ID_OFFSET);
231 if let Some(first) = scope_addr.first_mut() {
232 *first = first.wrapping_add(STORAGE_ID_OFFSET);
233 }
234 source.0 = source.0.wrapping_add(STORAGE_ID_OFFSET);
235 target.0 = target.0.wrapping_add(STORAGE_ID_OFFSET);
236 }
237 TimelyEvent::Shutdown(ShutdownEvent { id }) => {
238 *id = id.wrapping_add(STORAGE_ID_OFFSET);
239 }
240 TimelyEvent::Schedule(ScheduleEvent { id, .. }) => {
241 *id = id.wrapping_add(STORAGE_ID_OFFSET);
242 }
243 TimelyEvent::Messages(MessagesEvent { channel, .. }) => {
244 *channel = channel.wrapping_add(STORAGE_ID_OFFSET);
245 }
247 TimelyEvent::PushProgress(e) => {
248 e.op_id = e.op_id.wrapping_add(STORAGE_ID_OFFSET);
249 }
250 TimelyEvent::CommChannels(e) => {
251 e.identifier = e.identifier.wrapping_add(STORAGE_ID_OFFSET);
252 }
253 TimelyEvent::Park(_) | TimelyEvent::Text(_) => {
254 }
256 }
257}