mz_storage/storage_state/async_storage_worker.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A friendly companion async worker that can be used by a timely storage
11//! worker to do work that requires async.
12//!
13//! CAUTION: This is not meant for high-throughput data processing but for
14//! one-off requests that we need to do every now and then.
15
16use std::collections::BTreeMap;
17use std::fmt::Display;
18use std::sync::Arc;
19use std::thread::Thread;
20
21use differential_dataflow::lattice::Lattice;
22use mz_persist_client::Diagnostics;
23use mz_persist_client::cache::PersistClientCache;
24use mz_persist_client::read::ListenEvent;
25use mz_persist_types::Codec64;
26use mz_persist_types::codec_impls::UnitSchema;
27use mz_repr::{GlobalId, Row, TimestampManipulation};
28use mz_storage_types::StorageDiff;
29use mz_storage_types::controller::CollectionMetadata;
30use mz_storage_types::sinks::StorageSinkDesc;
31use mz_storage_types::sources::{
32 GenericSourceConnection, IngestionDescription, KafkaSourceConnection,
33 LoadGeneratorSourceConnection, MySqlSourceConnection, PostgresSourceConnection,
34 SourceConnection, SourceData, SourceEnvelope, SourceTimestamp, SqlServerSourceConnection,
35};
36use timely::order::{PartialOrder, TotalOrder};
37use timely::progress::frontier::MutableAntichain;
38use timely::progress::{Antichain, Timestamp};
39use tokio::sync::mpsc;
40
41use crate::source::types::SourceRender;
42
43/// A worker that can execute commands that come in on a channel and returns
44/// responses on another channel. This is useful in places where we can't
45/// normally run async code, such as the timely main loop.
46#[derive(Debug)]
47pub struct AsyncStorageWorker<T: Timestamp + Lattice + Codec64> {
48 tx: mpsc::UnboundedSender<AsyncStorageWorkerCommand<T>>,
49 rx: crossbeam_channel::Receiver<AsyncStorageWorkerResponse<T>>,
50}
51
52/// Commands for [AsyncStorageWorker].
53#[derive(Debug)]
54pub enum AsyncStorageWorkerCommand<T> {
55 /// Calculate a recent resumption frontier for the ingestion.
56 UpdateIngestionFrontiers(GlobalId, IngestionDescription<CollectionMetadata>),
57
58 /// Calculate a recent resumption frontier for the Sink.
59 UpdateSinkFrontiers(GlobalId, StorageSinkDesc<CollectionMetadata, T>),
60
61 /// This command is used to properly order create and drop of dataflows.
62 /// Currently, this is a no-op in AsyncStorageWorker.
63 ForwardDropDataflow(GlobalId),
64}
65
66/// Responses from [AsyncStorageWorker].
67#[derive(Debug)]
68pub enum AsyncStorageWorkerResponse<T: Timestamp + Lattice + Codec64> {
69 /// An `IngestionDescription` with recent as-of and resume upper frontiers.
70 IngestionFrontiersUpdated {
71 /// ID of the ingestion/source.
72 id: GlobalId,
73 /// The description of the ingestion/source.
74 ingestion_description: IngestionDescription<CollectionMetadata>,
75 /// The frontier beyond which ingested updates should be uncompacted. Inputs to the
76 /// ingestion are guaranteed to be readable at this frontier.
77 as_of: Antichain<T>,
78 /// A frontier in the Materialize time domain with the property that all updates not beyond
79 /// it have already been durably ingested.
80 resume_uppers: BTreeMap<GlobalId, Antichain<T>>,
81 /// A frontier in the source time domain with the property that all updates not beyond it
82 /// have already been durably ingested.
83 source_resume_uppers: BTreeMap<GlobalId, Vec<Row>>,
84 },
85 /// A `StorageSinkDesc` with recent as-of frontier.
86 ExportFrontiersUpdated {
87 /// ID of the sink.
88 id: GlobalId,
89 /// The updated description of the sink.
90 description: StorageSinkDesc<CollectionMetadata, T>,
91 },
92
93 /// Indicates data flow can be dropped.
94 DropDataflow(GlobalId),
95}
96
97async fn reclock_resume_uppers<C, IntoTime>(
98 id: &GlobalId,
99 persist_clients: &PersistClientCache,
100 ingestion_description: &IngestionDescription<CollectionMetadata>,
101 as_of: Antichain<IntoTime>,
102 resume_uppers: &BTreeMap<GlobalId, Antichain<IntoTime>>,
103) -> BTreeMap<GlobalId, Antichain<C::Time>>
104where
105 C: SourceConnection + SourceRender,
106 IntoTime: Timestamp + TotalOrder + Lattice + Codec64 + Display + Sync,
107{
108 let remap_metadata = &ingestion_description.remap_metadata;
109
110 let persist_client = persist_clients
111 .open(remap_metadata.persist_location.clone())
112 .await
113 .expect("location unavailable");
114
115 // We must load enough data in the timestamper to reclock all the requested frontiers
116 let mut remap_updates = vec![];
117 let mut remap_upper = as_of.clone();
118 let mut subscription = None;
119 for upper in resume_uppers.values() {
120 // TODO(petrosagg): this feels icky, we shouldn't have exceptions in frontier reasoning
121 // unless there is a good explanation as to why it is the case. It seems to me that this is
122 // because in various moments in ingestion we mix uppers and sinces and try to derive one
123 // from the other. Investigate if we could explicitly track natively timestamped
124 // since/uppers in the controller.
125 if upper.is_empty() {
126 continue;
127 }
128
129 while PartialOrder::less_than(&remap_upper, upper) {
130 let subscription = match subscription.as_mut() {
131 Some(subscription) => subscription,
132 None => {
133 let read_handle = persist_client
134 .open_leased_reader::<SourceData, (), IntoTime, StorageDiff>(
135 remap_metadata.data_shard.clone(),
136 Arc::new(remap_metadata.relation_desc.clone()),
137 Arc::new(UnitSchema),
138 Diagnostics {
139 shard_name: ingestion_description.remap_collection_id.to_string(),
140 handle_purpose: format!("reclock for {}", id),
141 },
142 false,
143 )
144 .await
145 .expect("shard unavailable");
146
147 let sub = read_handle
148 .subscribe(as_of.clone())
149 .await
150 .expect("always valid to read at since");
151
152 subscription.insert(sub)
153 }
154 };
155 for event in subscription.fetch_next().await {
156 match event {
157 ListenEvent::Updates(updates) => {
158 for ((k, v), t, d) in updates {
159 let row: Row = k.expect("invalid binding").0.expect("invalid binding");
160 let _v: () = v.expect("invalid binding");
161 let from_ts = C::Time::decode_row(&row);
162 remap_updates.push((from_ts, t, d));
163 }
164 }
165 ListenEvent::Progress(f) => remap_upper = f,
166 }
167 }
168 }
169 }
170
171 remap_updates.sort_unstable_by(|a, b| a.1.cmp(&b.1));
172
173 // The conversion of an IntoTime frontier to a FromTime frontier has the property that all
174 // messages that would be reclocked to times beyond the provided `IntoTime` frontier will be
175 // beyond the returned `FromTime` frontier. This can be used to compute a safe starting point
176 // to resume producing an `IntoTime` collection at a particular frontier.
177 let mut source_upper = MutableAntichain::new();
178 let mut source_upper_at_frontier = move |upper: &Antichain<IntoTime>| {
179 if PartialOrder::less_equal(upper, &as_of) {
180 Antichain::from_elem(Timestamp::minimum())
181 } else {
182 let idx = remap_updates.partition_point(|(_, t, _)| !upper.less_equal(t));
183 source_upper.clear();
184 source_upper.update_iter(
185 remap_updates[0..idx]
186 .iter()
187 .map(|(from_time, _, diff)| (from_time.clone(), *diff)),
188 );
189 source_upper.frontier().to_owned()
190 }
191 };
192
193 let mut source_resume_uppers = BTreeMap::new();
194 for (id, upper) in resume_uppers {
195 let source_upper = source_upper_at_frontier(upper);
196 source_resume_uppers.insert(*id, source_upper);
197 }
198 source_resume_uppers
199}
200
201impl<T: Timestamp + TimestampManipulation + Lattice + Codec64 + Display + Sync>
202 AsyncStorageWorker<T>
203{
204 /// Creates a new [`AsyncStorageWorker`].
205 ///
206 /// IMPORTANT: The passed in `thread` is unparked when new responses
207 /// are added the response channel. It is important to not sleep the thread
208 /// that is reading from this via [`try_recv`](Self::try_recv) when
209 /// [`is_empty`](Self::is_empty) has returned `false`.
210 pub fn new(thread: Thread, persist_clients: Arc<PersistClientCache>) -> Self {
211 let (command_tx, mut command_rx) = mpsc::unbounded_channel();
212 let (response_tx, response_rx) = crossbeam_channel::unbounded();
213
214 let response_tx = ActivatingSender::new(response_tx, thread);
215
216 mz_ore::task::spawn(|| "AsyncStorageWorker", async move {
217 while let Some(command) = command_rx.recv().await {
218 match command {
219 AsyncStorageWorkerCommand::UpdateIngestionFrontiers(
220 id,
221 ingestion_description,
222 ) => {
223 let mut resume_uppers = BTreeMap::new();
224
225 for (id, export) in ingestion_description.source_exports.iter() {
226 // Explicit destructuring to force a compile error when the metadata change
227 let CollectionMetadata {
228 persist_location,
229 data_shard,
230 relation_desc,
231 txns_shard,
232 } = &export.storage_metadata;
233 assert_eq!(
234 txns_shard, &None,
235 "source {} unexpectedly using txn-wal",
236 id
237 );
238 let client = persist_clients
239 .open(persist_location.clone())
240 .await
241 .expect("error creating persist client");
242
243 let mut write_handle = client
244 .open_writer::<SourceData, (), T, StorageDiff>(
245 *data_shard,
246 Arc::new(relation_desc.clone()),
247 Arc::new(UnitSchema),
248 Diagnostics {
249 shard_name: id.to_string(),
250 handle_purpose: format!("resumption data {}", id),
251 },
252 )
253 .await
254 .unwrap();
255 let upper = write_handle.fetch_recent_upper().await;
256 let upper = match export.data_config.envelope {
257 // The CdcV2 envelope must re-ingest everything since the Mz frontier does not have a relation to upstream timestamps.
258 // TODO(petrosagg): move this reasoning to the controller
259 SourceEnvelope::CdcV2 if upper.is_empty() => Antichain::new(),
260 SourceEnvelope::CdcV2 => Antichain::from_elem(Timestamp::minimum()),
261 _ => upper.clone(),
262 };
263 resume_uppers.insert(*id, upper);
264 write_handle.expire().await;
265 }
266
267 // Here we update the as-of frontier of the ingestion.
268 //
269 // The as-of frontier controls the frontier with which all inputs of the
270 // ingestion dataflow will be advanced by. It is in our interest to set the
271 // as-of froniter to the largest possible value, which will result in the
272 // maximum amount of consolidation, which in turn results in the minimum
273 // amount of memory required to hydrate.
274 //
275 // For each output `o` and for each input `i` of the ingestion the
276 // controller guarantees that i.since < o.upper except when o.upper is
277 // [T::minimum()]. Therefore the largest as-of for a particular output `o`
278 // is `{ (t - 1).advance_by(i.since) | t in o.upper }`.
279 //
280 // To calculate the global as_of frontier we take the minimum of all those
281 // per-output as-of frontiers.
282 let client = persist_clients
283 .open(
284 ingestion_description
285 .remap_metadata
286 .persist_location
287 .clone(),
288 )
289 .await
290 .expect("error creating persist client");
291 let read_handle = client
292 .open_leased_reader::<SourceData, (), T, StorageDiff>(
293 ingestion_description.remap_metadata.data_shard,
294 Arc::new(
295 ingestion_description.remap_metadata.relation_desc.clone(),
296 ),
297 Arc::new(UnitSchema),
298 Diagnostics {
299 shard_name: ingestion_description
300 .remap_collection_id
301 .to_string(),
302 handle_purpose: format!("resumption data for {}", id),
303 },
304 false,
305 )
306 .await
307 .unwrap();
308 let remap_since = read_handle.since().clone();
309 mz_ore::task::spawn(move || "deferred_expire", async move {
310 tokio::time::sleep(std::time::Duration::from_secs(300)).await;
311 read_handle.expire().await;
312 });
313 let mut as_of = Antichain::new();
314 for upper in resume_uppers.values() {
315 for t in upper.elements() {
316 let mut t_prime = t.step_back().unwrap_or_else(T::minimum);
317 if !remap_since.is_empty() {
318 t_prime.advance_by(remap_since.borrow());
319 as_of.insert(t_prime);
320 }
321 }
322 }
323
324 /// Convenience function to convert `BTreeMap<GlobalId, Antichain<C>>` to
325 /// `BTreeMap<GlobalId, Vec<Row>>`.
326 fn to_vec_row<T: SourceTimestamp>(
327 uppers: BTreeMap<GlobalId, Antichain<T>>,
328 ) -> BTreeMap<GlobalId, Vec<Row>> {
329 uppers
330 .into_iter()
331 .map(|(id, upper)| {
332 (id, upper.into_iter().map(|ts| ts.encode_row()).collect())
333 })
334 .collect()
335 }
336
337 // Create a specialized description to be able to call the generic method
338 let source_resume_uppers = match ingestion_description.desc.connection {
339 GenericSourceConnection::Kafka(_) => {
340 let uppers = reclock_resume_uppers::<KafkaSourceConnection, _>(
341 &id,
342 &persist_clients,
343 &ingestion_description,
344 as_of.clone(),
345 &resume_uppers,
346 )
347 .await;
348 to_vec_row(uppers)
349 }
350 GenericSourceConnection::Postgres(_) => {
351 let uppers = reclock_resume_uppers::<PostgresSourceConnection, _>(
352 &id,
353 &persist_clients,
354 &ingestion_description,
355 as_of.clone(),
356 &resume_uppers,
357 )
358 .await;
359 to_vec_row(uppers)
360 }
361 GenericSourceConnection::MySql(_) => {
362 let uppers = reclock_resume_uppers::<MySqlSourceConnection, _>(
363 &id,
364 &persist_clients,
365 &ingestion_description,
366 as_of.clone(),
367 &resume_uppers,
368 )
369 .await;
370 to_vec_row(uppers)
371 }
372 GenericSourceConnection::SqlServer(_) => {
373 let uppers = reclock_resume_uppers::<SqlServerSourceConnection, _>(
374 &id,
375 &persist_clients,
376 &ingestion_description,
377 as_of.clone(),
378 &resume_uppers,
379 )
380 .await;
381 to_vec_row(uppers)
382 }
383 GenericSourceConnection::LoadGenerator(_) => {
384 let uppers =
385 reclock_resume_uppers::<LoadGeneratorSourceConnection, _>(
386 &id,
387 &persist_clients,
388 &ingestion_description,
389 as_of.clone(),
390 &resume_uppers,
391 )
392 .await;
393 to_vec_row(uppers)
394 }
395 };
396
397 let res = response_tx.send(
398 AsyncStorageWorkerResponse::IngestionFrontiersUpdated {
399 id,
400 ingestion_description,
401 as_of,
402 resume_uppers,
403 source_resume_uppers,
404 },
405 );
406
407 if let Err(_err) = res {
408 // Receiver must have hung up.
409 break;
410 }
411 }
412 AsyncStorageWorkerCommand::UpdateSinkFrontiers(id, mut description) => {
413 let metadata = description.to_storage_metadata.clone();
414 let client = persist_clients
415 .open(metadata.persist_location.clone())
416 .await
417 .expect("error creating persist client");
418
419 let mut write_handle = client
420 .open_writer::<SourceData, (), T, StorageDiff>(
421 metadata.data_shard,
422 Arc::new(metadata.relation_desc),
423 Arc::new(UnitSchema),
424 Diagnostics {
425 shard_name: id.to_string(),
426 handle_purpose: format!("resumption data {}", id),
427 },
428 )
429 .await
430 .unwrap();
431 // Choose an as-of frontier for this execution of the sink. If the write
432 // frontier of the sink is strictly larger than its read hold, it must have
433 // at least written out its snapshot, and we can skip reading it; otherwise
434 // assume we may have to replay from the beginning.
435 let upper = write_handle.fetch_recent_upper().await;
436 let mut read_hold = Antichain::from_iter(
437 upper
438 .iter()
439 .map(|t| t.step_back().unwrap_or_else(T::minimum)),
440 );
441 read_hold.join_assign(&description.as_of);
442 description.with_snapshot = description.with_snapshot
443 && !PartialOrder::less_than(&description.as_of, upper);
444 description.as_of = read_hold;
445 let res =
446 response_tx.send(AsyncStorageWorkerResponse::ExportFrontiersUpdated {
447 id,
448 description,
449 });
450
451 if let Err(_err) = res {
452 // Receiver must have hung up.
453 break;
454 }
455 }
456 AsyncStorageWorkerCommand::ForwardDropDataflow(id) => {
457 if let Err(_) =
458 response_tx.send(AsyncStorageWorkerResponse::DropDataflow(id))
459 {
460 // Receiver hang up
461 break;
462 }
463 }
464 }
465 }
466 tracing::trace!("shutting down async storage worker task");
467 });
468
469 Self {
470 tx: command_tx,
471 rx: response_rx,
472 }
473 }
474
475 /// Updates the frontiers associated with the provided `IngestionDescription` to recent values.
476 /// Currently this will calculate a fresh as-of for the ingestion and a fresh resumption
477 /// frontier for each of the exports.
478 pub fn update_ingestion_frontiers(
479 &self,
480 id: GlobalId,
481 ingestion: IngestionDescription<CollectionMetadata>,
482 ) {
483 self.send(AsyncStorageWorkerCommand::UpdateIngestionFrontiers(
484 id, ingestion,
485 ))
486 }
487
488 /// Updates the frontiers associated with the provided `StorageSinkDesc` to recent values.
489 /// Currently this will calculate a fresh as-of for the ingestion.
490 pub fn update_sink_frontiers(
491 &self,
492 id: GlobalId,
493 sink: StorageSinkDesc<CollectionMetadata, T>,
494 ) {
495 self.send(AsyncStorageWorkerCommand::UpdateSinkFrontiers(id, sink))
496 }
497
498 /// Enqueue a drop dataflow in the async storage worker channel to ensure proper
499 /// ordering of creating and dropping data flows.
500 pub fn drop_dataflow(&self, id: GlobalId) {
501 self.send(AsyncStorageWorkerCommand::ForwardDropDataflow(id))
502 }
503
504 fn send(&self, cmd: AsyncStorageWorkerCommand<T>) {
505 self.tx
506 .send(cmd)
507 .expect("persist worker exited while its handle was alive")
508 }
509
510 /// Attempts to receive a message from the worker without blocking.
511 ///
512 /// This internally does a `try_recv` on a channel.
513 pub fn try_recv(
514 &self,
515 ) -> Result<AsyncStorageWorkerResponse<T>, crossbeam_channel::TryRecvError> {
516 self.rx.try_recv()
517 }
518
519 /// Returns `true` if there are currently no responses.
520 pub fn is_empty(&self) -> bool {
521 self.rx.is_empty()
522 }
523}
524
525/// Helper that makes sure that we always unpark the target thread when we send a
526/// message.
527struct ActivatingSender<T> {
528 tx: crossbeam_channel::Sender<T>,
529 thread: Thread,
530}
531
532impl<T> ActivatingSender<T> {
533 fn new(tx: crossbeam_channel::Sender<T>, thread: Thread) -> Self {
534 Self { tx, thread }
535 }
536
537 fn send(&self, message: T) -> Result<(), crossbeam_channel::SendError<T>> {
538 let res = self.tx.send(message);
539 self.thread.unpark();
540 res
541 }
542}