1use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::fmt::Write;
16use std::sync::Arc;
17
18use futures::future::BoxFuture;
19use futures::stream::FuturesUnordered;
20use futures::{FutureExt, StreamExt};
21use itertools::Itertools;
22use mz_ore::tracing::OpenTelemetryContext;
23use mz_persist_client::ShardId;
24use mz_persist_client::write::WriteHandle;
25use mz_repr::{GlobalId, Timestamp};
26use mz_storage_client::client::{TableData, Update};
27use mz_storage_types::StorageDiff;
28use mz_storage_types::controller::{InvalidUpper, TxnsCodecRow};
29use mz_storage_types::sources::SourceData;
30use mz_txn_wal::txns::{Tidy, TxnsHandle};
31use timely::progress::Antichain;
32use tokio::sync::mpsc::UnboundedSender;
33use tokio::sync::oneshot;
34use tracing::{Instrument, Span, debug, info_span};
35
36use crate::StorageError;
37
38mod read_only_table_worker;
39
40#[derive(Debug, Clone)]
41pub struct PersistTableWriteWorker {
42 inner: Arc<PersistTableWriteWorkerInner>,
43}
44
45#[derive(Debug)]
47enum PersistTableWriteCmd {
48 Register(
49 Timestamp,
50 Vec<(
51 GlobalId,
52 WriteHandle<SourceData, (), Timestamp, StorageDiff>,
53 )>,
54 tokio::sync::oneshot::Sender<()>,
55 ),
56 Update {
57 existing_collection: GlobalId,
59 new_collection: GlobalId,
61 forget_ts: Timestamp,
63 register_ts: Timestamp,
65 handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
67 tx: oneshot::Sender<()>,
69 },
70 DropHandles {
71 forget_ts: Timestamp,
72 ids: Vec<GlobalId>,
74 tx: oneshot::Sender<()>,
76 },
77 Append {
78 write_ts: Timestamp,
79 advance_to: Timestamp,
80 updates: Vec<(GlobalId, Vec<TableData>)>,
81 tx: tokio::sync::oneshot::Sender<Result<(), StorageError>>,
82 },
83 Shutdown,
84}
85
86impl PersistTableWriteCmd {
87 fn name(&self) -> &'static str {
88 match self {
89 PersistTableWriteCmd::Register(_, _, _) => "PersistTableWriteCmd::Register",
90 PersistTableWriteCmd::Update { .. } => "PersistTableWriteCmd::Update",
91 PersistTableWriteCmd::DropHandles { .. } => "PersistTableWriteCmd::DropHandle",
92 PersistTableWriteCmd::Append { .. } => "PersistTableWriteCmd::Append",
93 PersistTableWriteCmd::Shutdown => "PersistTableWriteCmd::Shutdown",
94 }
95 }
96}
97
98async fn append_work(
99 write_handles: &mut BTreeMap<GlobalId, WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
100 mut commands: BTreeMap<
101 GlobalId,
102 (
103 tracing::Span,
104 Vec<Update>,
105 Antichain<Timestamp>,
106 Antichain<Timestamp>,
107 ),
108 >,
109) -> Result<(), Vec<(GlobalId, Antichain<Timestamp>)>> {
110 let futs = FuturesUnordered::new();
111
112 for (id, write) in write_handles.iter_mut() {
120 if let Some((span, updates, expected_upper, new_upper)) = commands.remove(id) {
121 let updates = updates.into_iter().map(|u| {
122 (
123 (SourceData(Ok(u.row)), ()),
124 u.timestamp,
125 u.diff.into_inner(),
126 )
127 });
128
129 futs.push(async move {
130 write
131 .compare_and_append(updates.clone(), expected_upper.clone(), new_upper.clone())
132 .instrument(span.clone())
133 .await
134 .expect("cannot append updates")
135 .or_else(|upper_mismatch| Err((*id, upper_mismatch.current)))?;
136
137 Ok::<_, (GlobalId, Antichain<Timestamp>)>((*id, new_upper))
138 })
139 }
140 }
141
142 let (_new_uppers, failed_appends): (Vec<_>, Vec<_>) = futs
144 .collect::<Vec<_>>()
145 .await
146 .into_iter()
147 .partition_result();
148
149 if failed_appends.is_empty() {
150 Ok(())
151 } else {
152 Err(failed_appends)
153 }
154}
155
156impl PersistTableWriteWorker {
157 pub(crate) fn new_read_only_mode(
166 txns_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
167 ) -> Self {
168 let (tx, rx) =
169 tokio::sync::mpsc::unbounded_channel::<(tracing::Span, PersistTableWriteCmd)>();
170 mz_ore::task::spawn(
171 || "PersistTableWriteWorker",
172 read_only_table_worker::read_only_mode_table_worker(rx, txns_handle),
173 );
174 Self {
175 inner: Arc::new(PersistTableWriteWorkerInner::new(tx)),
176 }
177 }
178
179 pub(crate) fn new_txns(
180 txns: TxnsHandle<SourceData, (), Timestamp, StorageDiff, TxnsCodecRow>,
181 ) -> Self {
182 let (tx, rx) =
183 tokio::sync::mpsc::unbounded_channel::<(tracing::Span, PersistTableWriteCmd)>();
184 mz_ore::task::spawn(|| "PersistTableWriteWorker", async move {
185 let mut worker = TxnsTableWorker {
186 txns,
187 write_handles: BTreeMap::new(),
188 tidy: Tidy::default(),
189 };
190 worker.run(rx).await
191 });
192 Self {
193 inner: Arc::new(PersistTableWriteWorkerInner::new(tx)),
194 }
195 }
196
197 pub(crate) fn register(
198 &self,
199 register_ts: Timestamp,
200 ids_handles: Vec<(
201 GlobalId,
202 WriteHandle<SourceData, (), Timestamp, StorageDiff>,
203 )>,
204 ) -> tokio::sync::oneshot::Receiver<()> {
205 let span = info_span!("PersistTableWriteCmd::Register");
207 let (tx, rx) = tokio::sync::oneshot::channel();
208 let cmd = PersistTableWriteCmd::Register(register_ts, ids_handles, tx);
209 self.inner.send_with_span(span, cmd);
210 rx
211 }
212
213 #[allow(dead_code)]
221 pub(crate) fn update(
222 &self,
223 existing_collection: GlobalId,
224 new_collection: GlobalId,
225 forget_ts: Timestamp,
226 register_ts: Timestamp,
227 handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
228 ) -> oneshot::Receiver<()> {
229 let (tx, rx) = oneshot::channel();
230 self.send(PersistTableWriteCmd::Update {
231 existing_collection,
232 new_collection,
233 forget_ts,
234 register_ts,
235 handle,
236 tx,
237 });
238 rx
239 }
240
241 pub(crate) fn append(
242 &self,
243 write_ts: Timestamp,
244 advance_to: Timestamp,
245 updates: Vec<(GlobalId, Vec<TableData>)>,
246 ) -> tokio::sync::oneshot::Receiver<Result<(), StorageError>> {
247 let (tx, rx) = tokio::sync::oneshot::channel();
248 self.send(PersistTableWriteCmd::Append {
253 write_ts,
254 advance_to,
255 updates,
256 tx,
257 });
258 rx
259 }
260
261 pub(crate) fn drop_handles(
266 &self,
267 ids: Vec<GlobalId>,
268 forget_ts: Timestamp,
269 ) -> BoxFuture<'static, ()> {
270 let (tx, rx) = oneshot::channel();
271 self.send(PersistTableWriteCmd::DropHandles { forget_ts, ids, tx });
272 Box::pin(rx.map(|_| ()))
273 }
274
275 fn send(&self, cmd: PersistTableWriteCmd) {
276 self.inner.send(cmd);
277 }
278}
279
280struct TxnsTableWorker {
281 txns: TxnsHandle<SourceData, (), Timestamp, StorageDiff, TxnsCodecRow>,
282 write_handles: BTreeMap<GlobalId, ShardId>,
283 tidy: Tidy,
284}
285
286impl TxnsTableWorker {
287 async fn run(
288 &mut self,
289 mut rx: tokio::sync::mpsc::UnboundedReceiver<(Span, PersistTableWriteCmd)>,
290 ) {
291 while let Some((span, command)) = rx.recv().await {
292 match command {
293 PersistTableWriteCmd::Register(register_ts, ids_handles, tx) => {
294 self.register(register_ts, ids_handles)
295 .instrument(span)
296 .await;
297 let _ = tx.send(());
299 }
300 PersistTableWriteCmd::Update {
301 existing_collection,
302 new_collection,
303 forget_ts,
304 register_ts,
305 handle,
306 tx,
307 } => {
308 async {
309 self.drop_handles(vec![existing_collection], forget_ts)
310 .await;
311 self.register(register_ts, vec![(new_collection, handle)])
312 .await;
313 }
314 .instrument(span)
315 .await;
316 let _ = tx.send(());
318 }
319 PersistTableWriteCmd::DropHandles { forget_ts, ids, tx } => {
320 self.drop_handles(ids, forget_ts).instrument(span).await;
321 let _ = tx.send(());
323 }
324 PersistTableWriteCmd::Append {
325 write_ts,
326 advance_to,
327 updates,
328 tx,
329 } => {
330 self.append(write_ts, advance_to, updates, tx)
331 .instrument(span)
332 .await
333 }
334 PersistTableWriteCmd::Shutdown => {
335 tracing::info!("PersistTableWriteWorker shutting down via command");
336 return;
337 }
338 }
339 }
340
341 tracing::info!("PersistTableWriteWorker shutting down via input exhaustion");
342 }
343
344 async fn register(
345 &mut self,
346 register_ts: Timestamp,
347 mut ids_handles: Vec<(
348 GlobalId,
349 WriteHandle<SourceData, (), Timestamp, StorageDiff>,
350 )>,
351 ) {
352 ids_handles.sort_unstable_by_key(|(gid, _handle)| *gid);
356
357 for (id, write_handle) in ids_handles.iter() {
358 debug!(
359 "tables register {} {:.9}",
360 id,
361 write_handle.shard_id().to_string()
362 );
363 let previous = self.write_handles.insert(*id, write_handle.shard_id());
364 if previous.is_some() {
365 panic!("already registered a WriteHandle for collection {:?}", id);
366 }
367 }
368
369 let new_ids = ids_handles.iter().map(|(id, _)| *id).collect_vec();
371 let handles = ids_handles.into_iter().map(|(_, handle)| handle);
372 let res = self.txns.register(register_ts, handles).await;
373 match res {
374 Ok(tidy) => {
375 self.tidy.merge(tidy);
376 }
377 Err(current) => {
378 panic!(
379 "cannot register {:?} at {:?} because txns is at {:?}",
380 new_ids, register_ts, current
381 );
382 }
383 }
384 }
385
386 async fn drop_handles(&mut self, ids: Vec<GlobalId>, forget_ts: Timestamp) {
387 tracing::info!(?ids, "drop tables");
388 let data_ids = ids
389 .iter()
390 .filter_map(|id| self.write_handles.remove(id))
395 .collect::<BTreeSet<_>>();
396 if !data_ids.is_empty() {
397 match self.txns.forget(forget_ts, data_ids.clone()).await {
398 Ok(tidy) => {
399 self.tidy.merge(tidy);
400 }
401 Err(current) => {
402 panic!(
403 "cannot forget {:?} at {:?} because txns is at {:?}",
404 ids, forget_ts, current
405 );
406 }
407 }
408 }
409 }
410
411 async fn append(
412 &mut self,
413 write_ts: Timestamp,
414 advance_to: Timestamp,
415 updates: Vec<(GlobalId, Vec<TableData>)>,
416 tx: tokio::sync::oneshot::Sender<Result<(), StorageError>>,
417 ) {
418 debug!(
419 "tables append timestamp={:?} advance_to={:?} len={} ids={:?}{}",
420 write_ts,
421 advance_to,
422 updates.iter().flat_map(|(_, x)| x).count(),
423 updates
424 .iter()
425 .map(|(x, _)| x.to_string())
426 .collect::<BTreeSet<_>>(),
427 updates.iter().filter(|(_, v)| !v.is_empty()).fold(
428 String::new(),
429 |mut output, (k, v)| {
430 let _ = write!(output, "\n {}: {:?}", k, v.first());
431 output
432 }
433 )
434 );
435 assert_eq!(
441 advance_to,
442 mz_persist_types::StepForward::step_forward(&write_ts)
443 );
444
445 let mut txn = self.txns.begin();
446 for (id, updates) in updates {
447 let Some(data_id) = self.write_handles.get(&id) else {
448 assert!(
452 updates.iter().all(|u| u.is_empty()),
453 "{}: {:?}",
454 id,
455 updates
456 );
457 continue;
458 };
459 for update in updates {
460 match update {
461 TableData::Rows(updates) => {
462 for (row, diff) in updates {
463 let () = txn
464 .write(data_id, SourceData(Ok(row)), (), diff.into_inner())
465 .await;
466 }
467 }
468 TableData::Batches(batches) => {
469 for batch in batches {
470 let () = txn.write_batch(data_id, batch);
471 }
472 }
473 }
474 }
475 }
476 txn.tidy(std::mem::take(&mut self.tidy));
478 let txn_res = txn.commit_at(&mut self.txns, write_ts).await;
479 let response = match txn_res {
480 Ok(apply) => {
481 debug!("applying {:?}", apply);
484 let tidy = apply.apply(&mut self.txns).await;
485 self.tidy.merge(tidy);
486
487 let () = self.txns.compact_to(write_ts).await;
491
492 Ok(())
493 }
494 Err(current) => {
495 self.tidy.merge(txn.take_tidy());
496 debug!(
497 "unable to commit txn at {:?} current={:?}",
498 write_ts, current
499 );
500 Err(StorageError::InvalidUppers(
501 self.write_handles
502 .keys()
503 .copied()
504 .map(|id| InvalidUpper {
505 id,
506 current_upper: Antichain::from_elem(current),
507 })
508 .collect(),
509 ))
510 }
511 };
512 let _ = tx.send(response);
514 }
515}
516
517#[derive(Debug)]
526struct PersistTableWriteWorkerInner {
527 tx: UnboundedSender<(tracing::Span, PersistTableWriteCmd)>,
529}
530
531impl Drop for PersistTableWriteWorkerInner {
532 fn drop(&mut self) {
533 self.send(PersistTableWriteCmd::Shutdown);
534 }
536}
537
538impl PersistTableWriteWorkerInner {
539 fn new(tx: UnboundedSender<(tracing::Span, PersistTableWriteCmd)>) -> Self {
540 PersistTableWriteWorkerInner { tx }
541 }
542
543 fn send(&self, cmd: PersistTableWriteCmd) {
544 let span =
545 info_span!(parent: None, "PersistTableWriteWorkerInner::send", otel.name = cmd.name());
546 OpenTelemetryContext::obtain().attach_as_parent_to(&span);
547 self.send_with_span(span, cmd)
548 }
549
550 fn send_with_span(&self, span: Span, cmd: PersistTableWriteCmd) {
551 match self.tx.send((span, cmd)) {
552 Ok(()) => (), Err(e) => {
554 tracing::trace!("could not forward command: {:?}", e);
555 }
556 }
557 }
558}