mz_storage/storage_state/async_storage_worker.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A friendly companion async worker that can be used by a timely storage
11//! worker to do work that requires async.
12//!
13//! CAUTION: This is not meant for high-throughput data processing but for
14//! one-off requests that we need to do every now and then.
15
16use std::collections::BTreeMap;
17use std::fmt::Display;
18use std::sync::Arc;
19use std::thread::Thread;
20
21use differential_dataflow::lattice::Lattice;
22use mz_persist_client::Diagnostics;
23use mz_persist_client::cache::PersistClientCache;
24use mz_persist_client::read::ListenEvent;
25use mz_persist_types::Codec64;
26use mz_persist_types::codec_impls::UnitSchema;
27use mz_repr::{GlobalId, Row, TimestampManipulation};
28use mz_storage_types::StorageDiff;
29use mz_storage_types::controller::CollectionMetadata;
30use mz_storage_types::sinks::StorageSinkDesc;
31use mz_storage_types::sources::{
32 GenericSourceConnection, IngestionDescription, KafkaSourceConnection,
33 LoadGeneratorSourceConnection, MySqlSourceConnection, PostgresSourceConnection,
34 SourceConnection, SourceData, SourceEnvelope, SourceTimestamp, SqlServerSource,
35};
36use timely::order::{PartialOrder, TotalOrder};
37use timely::progress::frontier::MutableAntichain;
38use timely::progress::{Antichain, Timestamp};
39use tokio::sync::mpsc;
40
41use crate::source::types::SourceRender;
42
43/// A worker that can execute commands that come in on a channel and returns
44/// responses on another channel. This is useful in places where we can't
45/// normally run async code, such as the timely main loop.
46#[derive(Debug)]
47pub struct AsyncStorageWorker<T: Timestamp + Lattice + Codec64> {
48 tx: mpsc::UnboundedSender<AsyncStorageWorkerCommand<T>>,
49 rx: crossbeam_channel::Receiver<AsyncStorageWorkerResponse<T>>,
50}
51
52/// Commands for [AsyncStorageWorker].
53#[derive(Debug)]
54pub enum AsyncStorageWorkerCommand<T> {
55 /// Calculate a recent resumption frontier for the ingestion.
56 UpdateIngestionFrontiers(GlobalId, IngestionDescription<CollectionMetadata>),
57
58 /// Calculate a recent resumption frontier for the Sink.
59 UpdateSinkFrontiers(GlobalId, StorageSinkDesc<CollectionMetadata, T>),
60
61 /// This command is used to properly order create and drop of dataflows.
62 /// Currently, this is a no-op in AsyncStorageWorker.
63 ForwardDropDataflow(GlobalId),
64}
65
66/// Responses from [AsyncStorageWorker].
67#[derive(Debug)]
68pub enum AsyncStorageWorkerResponse<T: Timestamp + Lattice + Codec64> {
69 /// An `IngestionDescription` with recent as-of and resume upper frontiers.
70 IngestionFrontiersUpdated {
71 /// ID of the ingestion/source.
72 id: GlobalId,
73 /// The description of the ingestion/source.
74 ingestion_description: IngestionDescription<CollectionMetadata>,
75 /// The frontier beyond which ingested updates should be uncompacted. Inputs to the
76 /// ingestion are guaranteed to be readable at this frontier.
77 as_of: Antichain<T>,
78 /// A frontier in the Materialize time domain with the property that all updates not beyond
79 /// it have already been durably ingested.
80 resume_uppers: BTreeMap<GlobalId, Antichain<T>>,
81 /// A frontier in the source time domain with the property that all updates not beyond it
82 /// have already been durably ingested.
83 source_resume_uppers: BTreeMap<GlobalId, Vec<Row>>,
84 },
85 /// A `StorageSinkDesc` with recent as-of frontier.
86 ExportFrontiersUpdated {
87 /// ID of the sink.
88 id: GlobalId,
89 /// The updated description of the sink.
90 description: StorageSinkDesc<CollectionMetadata, T>,
91 },
92
93 /// Indicates data flow can be dropped.
94 DropDataflow(GlobalId),
95}
96
97async fn reclock_resume_uppers<C, IntoTime>(
98 id: &GlobalId,
99 persist_clients: &PersistClientCache,
100 ingestion_description: &IngestionDescription<CollectionMetadata>,
101 as_of: Antichain<IntoTime>,
102 resume_uppers: &BTreeMap<GlobalId, Antichain<IntoTime>>,
103) -> BTreeMap<GlobalId, Antichain<C::Time>>
104where
105 C: SourceConnection + SourceRender,
106 IntoTime: Timestamp + TotalOrder + Lattice + Codec64 + Display + Sync,
107{
108 let metadata = &ingestion_description.ingestion_metadata;
109
110 let persist_client = persist_clients
111 .open(metadata.persist_location.clone())
112 .await
113 .expect("location unavailable");
114
115 // We must load enough data in the timestamper to reclock all the requested frontiers
116 let mut remap_updates = vec![];
117 let mut remap_upper = as_of.clone();
118 let mut subscription = None;
119 for upper in resume_uppers.values() {
120 // TODO(petrosagg): this feels icky, we shouldn't have exceptions in frontier reasoning
121 // unless there is a good explanation as to why it is the case. It seems to me that this is
122 // because in various moments in ingestion we mix uppers and sinces and try to derive one
123 // from the other. Investigate if we could explicitly track natively timestamped
124 // since/uppers in the controller.
125 if upper.is_empty() {
126 continue;
127 }
128
129 while PartialOrder::less_than(&remap_upper, upper) {
130 let subscription = match subscription.as_mut() {
131 Some(subscription) => subscription,
132 None => {
133 let read_handle = persist_client
134 .open_leased_reader::<SourceData, (), IntoTime, StorageDiff>(
135 metadata.remap_shard.clone().unwrap(),
136 Arc::new(ingestion_description.desc.connection.timestamp_desc()),
137 Arc::new(UnitSchema),
138 Diagnostics {
139 shard_name: ingestion_description.remap_collection_id.to_string(),
140 handle_purpose: format!("reclock for {}", id),
141 },
142 false,
143 )
144 .await
145 .expect("shard unavailable");
146
147 let sub = read_handle
148 .subscribe(as_of.clone())
149 .await
150 .expect("always valid to read at since");
151
152 subscription.insert(sub)
153 }
154 };
155 for event in subscription.fetch_next().await {
156 match event {
157 ListenEvent::Updates(updates) => {
158 for ((k, v), t, d) in updates {
159 let row: Row = k.expect("invalid binding").0.expect("invalid binding");
160 let _v: () = v.expect("invalid binding");
161 let from_ts = C::Time::decode_row(&row);
162 remap_updates.push((from_ts, t, d));
163 }
164 }
165 ListenEvent::Progress(f) => remap_upper = f,
166 }
167 }
168 }
169 }
170
171 remap_updates.sort_unstable_by(|a, b| a.1.cmp(&b.1));
172
173 // The conversion of an IntoTime frontier to a FromTime frontier has the property that all
174 // messages that would be reclocked to times beyond the provided `IntoTime` frontier will be
175 // beyond the returned `FromTime` frontier. This can be used to compute a safe starting point
176 // to resume producing an `IntoTime` collection at a particular frontier.
177 let mut source_upper = MutableAntichain::new();
178 let mut source_upper_at_frontier = move |upper: &Antichain<IntoTime>| {
179 if PartialOrder::less_equal(upper, &as_of) {
180 Antichain::from_elem(Timestamp::minimum())
181 } else {
182 let idx = remap_updates.partition_point(|(_, t, _)| !upper.less_equal(t));
183 source_upper.clear();
184 source_upper.update_iter(
185 remap_updates[0..idx]
186 .iter()
187 .map(|(from_time, _, diff)| (from_time.clone(), *diff)),
188 );
189 source_upper.frontier().to_owned()
190 }
191 };
192
193 let mut source_resume_uppers = BTreeMap::new();
194 for (id, upper) in resume_uppers {
195 let source_upper = source_upper_at_frontier(upper);
196 source_resume_uppers.insert(*id, source_upper);
197 }
198 source_resume_uppers
199}
200
201impl<T: Timestamp + TimestampManipulation + Lattice + Codec64 + Display + Sync>
202 AsyncStorageWorker<T>
203{
204 /// Creates a new [`AsyncStorageWorker`].
205 ///
206 /// IMPORTANT: The passed in `thread` is unparked when new responses
207 /// are added the response channel. It is important to not sleep the thread
208 /// that is reading from this via [`try_recv`](Self::try_recv) when
209 /// [`is_empty`](Self::is_empty) has returned `false`.
210 pub fn new(thread: Thread, persist_clients: Arc<PersistClientCache>) -> Self {
211 let (command_tx, mut command_rx) = mpsc::unbounded_channel();
212 let (response_tx, response_rx) = crossbeam_channel::unbounded();
213
214 let response_tx = ActivatingSender::new(response_tx, thread);
215
216 mz_ore::task::spawn(|| "AsyncStorageWorker", async move {
217 while let Some(command) = command_rx.recv().await {
218 match command {
219 AsyncStorageWorkerCommand::UpdateIngestionFrontiers(
220 id,
221 ingestion_description,
222 ) => {
223 let mut resume_uppers = BTreeMap::new();
224
225 let seen_remap_shard = ingestion_description
226 .ingestion_metadata
227 .remap_shard
228 .expect("ingestions must have a remap shard");
229
230 for (id, export) in ingestion_description.source_exports.iter() {
231 // Explicit destructuring to force a compile error when the metadata change
232 let CollectionMetadata {
233 persist_location,
234 remap_shard,
235 data_shard,
236 relation_desc,
237 txns_shard,
238 } = &export.storage_metadata;
239 assert_eq!(
240 txns_shard, &None,
241 "source {} unexpectedly using txn-wal",
242 id
243 );
244 let client = persist_clients
245 .open(persist_location.clone())
246 .await
247 .expect("error creating persist client");
248
249 let mut write_handle = client
250 .open_writer::<SourceData, (), T, StorageDiff>(
251 *data_shard,
252 Arc::new(relation_desc.clone()),
253 Arc::new(UnitSchema),
254 Diagnostics {
255 shard_name: id.to_string(),
256 handle_purpose: format!("resumption data {}", id),
257 },
258 )
259 .await
260 .unwrap();
261 let upper = write_handle.fetch_recent_upper().await;
262 let upper = match export.data_config.envelope {
263 // The CdcV2 envelope must re-ingest everything since the Mz frontier does not have a relation to upstream timestamps.
264 // TODO(petrosagg): move this reasoning to the controller
265 SourceEnvelope::CdcV2 if upper.is_empty() => Antichain::new(),
266 SourceEnvelope::CdcV2 => Antichain::from_elem(Timestamp::minimum()),
267 _ => upper.clone(),
268 };
269 resume_uppers.insert(*id, upper);
270 write_handle.expire().await;
271
272 if let Some(remap_shard) = remap_shard {
273 assert_eq!(
274 seen_remap_shard, *remap_shard,
275 "ingestion with multiple remap shards"
276 );
277 }
278 }
279
280 // Here we update the as-of frontier of the ingestion.
281 //
282 // The as-of frontier controls the frontier with which all inputs of the
283 // ingestion dataflow will be advanced by. It is in our interest to set the
284 // as-of froniter to the largest possible value, which will result in the
285 // maximum amount of consolidation, which in turn results in the minimum
286 // amount of memory required to hydrate.
287 //
288 // For each output `o` and for each input `i` of the ingestion the
289 // controller guarantees that i.since < o.upper except when o.upper is
290 // [T::minimum()]. Therefore the largest as-of for a particular output `o`
291 // is `{ (t - 1).advance_by(i.since) | t in o.upper }`.
292 //
293 // To calculate the global as_of frontier we take the minimum of all those
294 // per-output as-of frontiers.
295 let client = persist_clients
296 .open(
297 ingestion_description
298 .ingestion_metadata
299 .persist_location
300 .clone(),
301 )
302 .await
303 .expect("error creating persist client");
304 let read_handle = client
305 .open_leased_reader::<SourceData, (), T, StorageDiff>(
306 seen_remap_shard,
307 Arc::new(ingestion_description.desc.connection.timestamp_desc()),
308 Arc::new(UnitSchema),
309 Diagnostics {
310 shard_name: ingestion_description
311 .remap_collection_id
312 .to_string(),
313 handle_purpose: format!("resumption data for {}", id),
314 },
315 false,
316 )
317 .await
318 .unwrap();
319 let remap_since = read_handle.since().clone();
320 mz_ore::task::spawn(move || "deferred_expire", async move {
321 tokio::time::sleep(std::time::Duration::from_secs(300)).await;
322 read_handle.expire().await;
323 });
324 let mut as_of = Antichain::new();
325 for upper in resume_uppers.values() {
326 for t in upper.elements() {
327 let mut t_prime = t.step_back().unwrap_or_else(T::minimum);
328 if !remap_since.is_empty() {
329 t_prime.advance_by(remap_since.borrow());
330 as_of.insert(t_prime);
331 }
332 }
333 }
334
335 /// Convenience function to convert `BTreeMap<GlobalId, Antichain<C>>` to
336 /// `BTreeMap<GlobalId, Vec<Row>>`.
337 fn to_vec_row<T: SourceTimestamp>(
338 uppers: BTreeMap<GlobalId, Antichain<T>>,
339 ) -> BTreeMap<GlobalId, Vec<Row>> {
340 uppers
341 .into_iter()
342 .map(|(id, upper)| {
343 (id, upper.into_iter().map(|ts| ts.encode_row()).collect())
344 })
345 .collect()
346 }
347
348 // Create a specialized description to be able to call the generic method
349 let source_resume_uppers = match ingestion_description.desc.connection {
350 GenericSourceConnection::Kafka(_) => {
351 let uppers = reclock_resume_uppers::<KafkaSourceConnection, _>(
352 &id,
353 &persist_clients,
354 &ingestion_description,
355 as_of.clone(),
356 &resume_uppers,
357 )
358 .await;
359 to_vec_row(uppers)
360 }
361 GenericSourceConnection::Postgres(_) => {
362 let uppers = reclock_resume_uppers::<PostgresSourceConnection, _>(
363 &id,
364 &persist_clients,
365 &ingestion_description,
366 as_of.clone(),
367 &resume_uppers,
368 )
369 .await;
370 to_vec_row(uppers)
371 }
372 GenericSourceConnection::MySql(_) => {
373 let uppers = reclock_resume_uppers::<MySqlSourceConnection, _>(
374 &id,
375 &persist_clients,
376 &ingestion_description,
377 as_of.clone(),
378 &resume_uppers,
379 )
380 .await;
381 to_vec_row(uppers)
382 }
383 GenericSourceConnection::SqlServer(_) => {
384 let uppers = reclock_resume_uppers::<SqlServerSource, _>(
385 &id,
386 &persist_clients,
387 &ingestion_description,
388 as_of.clone(),
389 &resume_uppers,
390 )
391 .await;
392 to_vec_row(uppers)
393 }
394 GenericSourceConnection::LoadGenerator(_) => {
395 let uppers =
396 reclock_resume_uppers::<LoadGeneratorSourceConnection, _>(
397 &id,
398 &persist_clients,
399 &ingestion_description,
400 as_of.clone(),
401 &resume_uppers,
402 )
403 .await;
404 to_vec_row(uppers)
405 }
406 };
407
408 let res = response_tx.send(
409 AsyncStorageWorkerResponse::IngestionFrontiersUpdated {
410 id,
411 ingestion_description,
412 as_of,
413 resume_uppers,
414 source_resume_uppers,
415 },
416 );
417
418 if let Err(_err) = res {
419 // Receiver must have hung up.
420 break;
421 }
422 }
423 AsyncStorageWorkerCommand::UpdateSinkFrontiers(id, mut description) => {
424 let metadata = description.to_storage_metadata.clone();
425 let client = persist_clients
426 .open(metadata.persist_location.clone())
427 .await
428 .expect("error creating persist client");
429
430 let mut write_handle = client
431 .open_writer::<SourceData, (), T, StorageDiff>(
432 metadata.data_shard,
433 Arc::new(metadata.relation_desc),
434 Arc::new(UnitSchema),
435 Diagnostics {
436 shard_name: id.to_string(),
437 handle_purpose: format!("resumption data {}", id),
438 },
439 )
440 .await
441 .unwrap();
442 // Choose an as-of frontier for this execution of the sink. If the write
443 // frontier of the sink is strictly larger than its read hold, it must have
444 // at least written out its snapshot, and we can skip reading it; otherwise
445 // assume we may have to replay from the beginning.
446 let upper = write_handle.fetch_recent_upper().await;
447 let mut read_hold = Antichain::from_iter(
448 upper
449 .iter()
450 .map(|t| t.step_back().unwrap_or_else(T::minimum)),
451 );
452 read_hold.join_assign(&description.as_of);
453 description.with_snapshot = description.with_snapshot
454 && !PartialOrder::less_than(&description.as_of, upper);
455 description.as_of = read_hold;
456 let res =
457 response_tx.send(AsyncStorageWorkerResponse::ExportFrontiersUpdated {
458 id,
459 description,
460 });
461
462 if let Err(_err) = res {
463 // Receiver must have hung up.
464 break;
465 }
466 }
467 AsyncStorageWorkerCommand::ForwardDropDataflow(id) => {
468 if let Err(_) =
469 response_tx.send(AsyncStorageWorkerResponse::DropDataflow(id))
470 {
471 // Receiver hang up
472 break;
473 }
474 }
475 }
476 }
477 tracing::trace!("shutting down async storage worker task");
478 });
479
480 Self {
481 tx: command_tx,
482 rx: response_rx,
483 }
484 }
485
486 /// Updates the frontiers associated with the provided `IngestionDescription` to recent values.
487 /// Currently this will calculate a fresh as-of for the ingestion and a fresh resumption
488 /// frontier for each of the exports.
489 pub fn update_ingestion_frontiers(
490 &self,
491 id: GlobalId,
492 ingestion: IngestionDescription<CollectionMetadata>,
493 ) {
494 self.send(AsyncStorageWorkerCommand::UpdateIngestionFrontiers(
495 id, ingestion,
496 ))
497 }
498
499 /// Updates the frontiers associated with the provided `StorageSinkDesc` to recent values.
500 /// Currently this will calculate a fresh as-of for the ingestion.
501 pub fn update_sink_frontiers(
502 &self,
503 id: GlobalId,
504 sink: StorageSinkDesc<CollectionMetadata, T>,
505 ) {
506 self.send(AsyncStorageWorkerCommand::UpdateSinkFrontiers(id, sink))
507 }
508
509 /// Enqueue a drop dataflow in the async storage worker channel to ensure proper
510 /// ordering of creating and dropping data flows.
511 pub fn drop_dataflow(&self, id: GlobalId) {
512 self.send(AsyncStorageWorkerCommand::ForwardDropDataflow(id))
513 }
514
515 fn send(&self, cmd: AsyncStorageWorkerCommand<T>) {
516 self.tx
517 .send(cmd)
518 .expect("persist worker exited while its handle was alive")
519 }
520
521 /// Attempts to receive a message from the worker without blocking.
522 ///
523 /// This internally does a `try_recv` on a channel.
524 pub fn try_recv(
525 &self,
526 ) -> Result<AsyncStorageWorkerResponse<T>, crossbeam_channel::TryRecvError> {
527 self.rx.try_recv()
528 }
529
530 /// Returns `true` if there are currently no responses.
531 pub fn is_empty(&self) -> bool {
532 self.rx.is_empty()
533 }
534}
535
536/// Helper that makes sure that we always unpark the target thread when we send a
537/// message.
538struct ActivatingSender<T> {
539 tx: crossbeam_channel::Sender<T>,
540 thread: Thread,
541}
542
543impl<T> ActivatingSender<T> {
544 fn new(tx: crossbeam_channel::Sender<T>, thread: Thread) -> Self {
545 Self { tx, thread }
546 }
547
548 fn send(&self, message: T) -> Result<(), crossbeam_channel::SendError<T>> {
549 let res = self.tx.send(message);
550 self.thread.unpark();
551 res
552 }
553}