1use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::fmt::Write;
16use std::sync::Arc;
17
18use futures::future::BoxFuture;
19use futures::stream::FuturesUnordered;
20use futures::{FutureExt, StreamExt};
21use itertools::Itertools;
22use mz_ore::tracing::OpenTelemetryContext;
23use mz_persist_client::ShardId;
24use mz_persist_client::write::WriteHandle;
25use mz_repr::{GlobalId, Timestamp};
26use mz_storage_client::client::{TableData, Update};
27use mz_storage_types::StorageDiff;
28use mz_storage_types::controller::{InvalidUpper, TxnsCodecRow};
29use mz_storage_types::sources::SourceData;
30use mz_txn_wal::txns::{Tidy, TxnsHandle};
31use timely::progress::Antichain;
32use tokio::sync::mpsc::UnboundedSender;
33use tokio::sync::oneshot;
34use tracing::{Instrument, Span, debug, info_span};
35
36use crate::StorageError;
37
38mod read_only_table_worker;
39
40#[derive(Debug, Clone)]
41pub struct PersistTableWriteWorker {
42 inner: Arc<PersistTableWriteWorkerInner>,
43}
44
45#[derive(Debug)]
47enum PersistTableWriteCmd {
48 Register(
49 Timestamp,
50 Vec<(
51 GlobalId,
52 WriteHandle<SourceData, (), Timestamp, StorageDiff>,
53 )>,
54 tokio::sync::oneshot::Sender<()>,
55 ),
56 Update {
57 existing_collection: GlobalId,
59 new_collection: GlobalId,
61 forget_ts: Timestamp,
63 register_ts: Timestamp,
65 handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
67 tx: oneshot::Sender<()>,
69 },
70 DropHandles {
71 forget_ts: Timestamp,
72 ids: Vec<GlobalId>,
74 tx: oneshot::Sender<()>,
76 },
77 Append {
78 write_ts: Timestamp,
79 advance_to: Timestamp,
80 updates: Vec<(GlobalId, Vec<TableData>)>,
81 tx: tokio::sync::oneshot::Sender<Result<(), StorageError>>,
82 },
83 Shutdown,
84}
85
86impl PersistTableWriteCmd {
87 fn name(&self) -> &'static str {
88 match self {
89 PersistTableWriteCmd::Register(_, _, _) => "PersistTableWriteCmd::Register",
90 PersistTableWriteCmd::Update { .. } => "PersistTableWriteCmd::Update",
91 PersistTableWriteCmd::DropHandles { .. } => "PersistTableWriteCmd::DropHandle",
92 PersistTableWriteCmd::Append { .. } => "PersistTableWriteCmd::Append",
93 PersistTableWriteCmd::Shutdown => "PersistTableWriteCmd::Shutdown",
94 }
95 }
96}
97
98async fn append_work(
99 write_handles: &mut BTreeMap<GlobalId, WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
100 mut commands: BTreeMap<
101 GlobalId,
102 (
103 tracing::Span,
104 Vec<Update>,
105 Antichain<Timestamp>,
106 Antichain<Timestamp>,
107 ),
108 >,
109) -> Result<(), Vec<(GlobalId, Antichain<Timestamp>)>> {
110 let futs = FuturesUnordered::new();
111
112 for (id, write) in write_handles.iter_mut() {
120 if let Some((span, updates, expected_upper, new_upper)) = commands.remove(id) {
121 let updates = updates.into_iter().map(|u| {
122 (
123 (SourceData(Ok(u.row)), ()),
124 u.timestamp,
125 u.diff.into_inner(),
126 )
127 });
128
129 futs.push(async move {
130 write
131 .compare_and_append(updates.clone(), expected_upper.clone(), new_upper.clone())
132 .instrument(span.clone())
133 .await
134 .expect("cannot append updates")
135 .or_else(|upper_mismatch| Err((*id, upper_mismatch.current)))?;
136
137 Ok::<_, (GlobalId, Antichain<Timestamp>)>((*id, new_upper))
138 })
139 }
140 }
141
142 let (_new_uppers, failed_appends): (Vec<_>, Vec<_>) = futs
144 .collect::<Vec<_>>()
145 .await
146 .into_iter()
147 .partition_result();
148
149 if failed_appends.is_empty() {
150 Ok(())
151 } else {
152 Err(failed_appends)
153 }
154}
155
156impl PersistTableWriteWorker {
157 pub(crate) fn new_read_only_mode(
166 txns_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
167 ) -> Self {
168 let (tx, rx) =
169 tokio::sync::mpsc::unbounded_channel::<(tracing::Span, PersistTableWriteCmd)>();
170 mz_ore::task::spawn(
171 || "PersistTableWriteWorker",
172 read_only_table_worker::read_only_mode_table_worker(rx, txns_handle),
173 );
174 Self {
175 inner: Arc::new(PersistTableWriteWorkerInner::new(tx)),
176 }
177 }
178
179 pub(crate) fn new_txns(
180 txns: TxnsHandle<SourceData, (), Timestamp, StorageDiff, TxnsCodecRow>,
181 ) -> Self {
182 let (tx, rx) =
183 tokio::sync::mpsc::unbounded_channel::<(tracing::Span, PersistTableWriteCmd)>();
184 mz_ore::task::spawn(|| "PersistTableWriteWorker", async move {
185 let mut worker = TxnsTableWorker {
186 txns,
187 write_handles: BTreeMap::new(),
188 tidy: Tidy::default(),
189 };
190 worker.run(rx).await
191 });
192 Self {
193 inner: Arc::new(PersistTableWriteWorkerInner::new(tx)),
194 }
195 }
196
197 pub(crate) fn register(
198 &self,
199 register_ts: Timestamp,
200 ids_handles: Vec<(
201 GlobalId,
202 WriteHandle<SourceData, (), Timestamp, StorageDiff>,
203 )>,
204 ) -> tokio::sync::oneshot::Receiver<()> {
205 let span = info_span!("PersistTableWriteCmd::Register");
207 let (tx, rx) = tokio::sync::oneshot::channel();
208 let cmd = PersistTableWriteCmd::Register(register_ts, ids_handles, tx);
209 self.inner.send_with_span(span, cmd);
210 rx
211 }
212
213 #[allow(dead_code)]
221 pub(crate) fn update(
222 &self,
223 existing_collection: GlobalId,
224 new_collection: GlobalId,
225 forget_ts: Timestamp,
226 register_ts: Timestamp,
227 handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
228 ) -> oneshot::Receiver<()> {
229 let (tx, rx) = oneshot::channel();
230 self.send(PersistTableWriteCmd::Update {
231 existing_collection,
232 new_collection,
233 forget_ts,
234 register_ts,
235 handle,
236 tx,
237 });
238 rx
239 }
240
241 pub(crate) fn append(
242 &self,
243 write_ts: Timestamp,
244 advance_to: Timestamp,
245 updates: Vec<(GlobalId, Vec<TableData>)>,
246 ) -> tokio::sync::oneshot::Receiver<Result<(), StorageError>> {
247 let (tx, rx) = tokio::sync::oneshot::channel();
248 if updates.is_empty() {
249 tx.send(Ok(()))
250 .expect("rx has not been dropped at this point");
251 rx
252 } else {
253 self.send(PersistTableWriteCmd::Append {
254 write_ts,
255 advance_to,
256 updates,
257 tx,
258 });
259 rx
260 }
261 }
262
263 pub(crate) fn drop_handles(
268 &self,
269 ids: Vec<GlobalId>,
270 forget_ts: Timestamp,
271 ) -> BoxFuture<'static, ()> {
272 let (tx, rx) = oneshot::channel();
273 self.send(PersistTableWriteCmd::DropHandles { forget_ts, ids, tx });
274 Box::pin(rx.map(|_| ()))
275 }
276
277 fn send(&self, cmd: PersistTableWriteCmd) {
278 self.inner.send(cmd);
279 }
280}
281
282struct TxnsTableWorker {
283 txns: TxnsHandle<SourceData, (), Timestamp, StorageDiff, TxnsCodecRow>,
284 write_handles: BTreeMap<GlobalId, ShardId>,
285 tidy: Tidy,
286}
287
288impl TxnsTableWorker {
289 async fn run(
290 &mut self,
291 mut rx: tokio::sync::mpsc::UnboundedReceiver<(Span, PersistTableWriteCmd)>,
292 ) {
293 while let Some((span, command)) = rx.recv().await {
294 match command {
295 PersistTableWriteCmd::Register(register_ts, ids_handles, tx) => {
296 self.register(register_ts, ids_handles)
297 .instrument(span)
298 .await;
299 let _ = tx.send(());
301 }
302 PersistTableWriteCmd::Update {
303 existing_collection,
304 new_collection,
305 forget_ts,
306 register_ts,
307 handle,
308 tx,
309 } => {
310 async {
311 self.drop_handles(vec![existing_collection], forget_ts)
312 .await;
313 self.register(register_ts, vec![(new_collection, handle)])
314 .await;
315 }
316 .instrument(span)
317 .await;
318 let _ = tx.send(());
320 }
321 PersistTableWriteCmd::DropHandles { forget_ts, ids, tx } => {
322 self.drop_handles(ids, forget_ts).instrument(span).await;
323 let _ = tx.send(());
325 }
326 PersistTableWriteCmd::Append {
327 write_ts,
328 advance_to,
329 updates,
330 tx,
331 } => {
332 self.append(write_ts, advance_to, updates, tx)
333 .instrument(span)
334 .await
335 }
336 PersistTableWriteCmd::Shutdown => {
337 tracing::info!("PersistTableWriteWorker shutting down via command");
338 return;
339 }
340 }
341 }
342
343 tracing::info!("PersistTableWriteWorker shutting down via input exhaustion");
344 }
345
346 async fn register(
347 &mut self,
348 register_ts: Timestamp,
349 mut ids_handles: Vec<(
350 GlobalId,
351 WriteHandle<SourceData, (), Timestamp, StorageDiff>,
352 )>,
353 ) {
354 ids_handles.sort_unstable_by_key(|(gid, _handle)| *gid);
358
359 for (id, write_handle) in ids_handles.iter() {
360 debug!(
361 "tables register {} {:.9}",
362 id,
363 write_handle.shard_id().to_string()
364 );
365 let previous = self.write_handles.insert(*id, write_handle.shard_id());
366 if previous.is_some() {
367 panic!("already registered a WriteHandle for collection {:?}", id);
368 }
369 }
370
371 let new_ids = ids_handles.iter().map(|(id, _)| *id).collect_vec();
373 let handles = ids_handles.into_iter().map(|(_, handle)| handle);
374 let res = self.txns.register(register_ts, handles).await;
375 match res {
376 Ok(tidy) => {
377 self.tidy.merge(tidy);
378 }
379 Err(current) => {
380 panic!(
381 "cannot register {:?} at {:?} because txns is at {:?}",
382 new_ids, register_ts, current
383 );
384 }
385 }
386 }
387
388 async fn drop_handles(&mut self, ids: Vec<GlobalId>, forget_ts: Timestamp) {
389 tracing::info!(?ids, "drop tables");
390 let data_ids = ids
391 .iter()
392 .filter_map(|id| self.write_handles.remove(id))
397 .collect::<BTreeSet<_>>();
398 if !data_ids.is_empty() {
399 match self.txns.forget(forget_ts, data_ids.clone()).await {
400 Ok(tidy) => {
401 self.tidy.merge(tidy);
402 }
403 Err(current) => {
404 panic!(
405 "cannot forget {:?} at {:?} because txns is at {:?}",
406 ids, forget_ts, current
407 );
408 }
409 }
410 }
411 }
412
413 async fn append(
414 &mut self,
415 write_ts: Timestamp,
416 advance_to: Timestamp,
417 updates: Vec<(GlobalId, Vec<TableData>)>,
418 tx: tokio::sync::oneshot::Sender<Result<(), StorageError>>,
419 ) {
420 debug!(
421 "tables append timestamp={:?} advance_to={:?} len={} ids={:?}{}",
422 write_ts,
423 advance_to,
424 updates.iter().flat_map(|(_, x)| x).count(),
425 updates
426 .iter()
427 .map(|(x, _)| x.to_string())
428 .collect::<BTreeSet<_>>(),
429 updates.iter().filter(|(_, v)| !v.is_empty()).fold(
430 String::new(),
431 |mut output, (k, v)| {
432 let _ = write!(output, "\n {}: {:?}", k, v.first());
433 output
434 }
435 )
436 );
437 assert_eq!(
443 advance_to,
444 mz_persist_types::StepForward::step_forward(&write_ts)
445 );
446
447 let mut txn = self.txns.begin();
448 for (id, updates) in updates {
449 let Some(data_id) = self.write_handles.get(&id) else {
450 assert!(
454 updates.iter().all(|u| u.is_empty()),
455 "{}: {:?}",
456 id,
457 updates
458 );
459 continue;
460 };
461 for update in updates {
462 match update {
463 TableData::Rows(updates) => {
464 for (row, diff) in updates {
465 let () = txn
466 .write(data_id, SourceData(Ok(row)), (), diff.into_inner())
467 .await;
468 }
469 }
470 TableData::Batches(batches) => {
471 for batch in batches {
472 let () = txn.write_batch(data_id, batch);
473 }
474 }
475 }
476 }
477 }
478 txn.tidy(std::mem::take(&mut self.tidy));
480 let txn_res = txn.commit_at(&mut self.txns, write_ts).await;
481 let response = match txn_res {
482 Ok(apply) => {
483 debug!("applying {:?}", apply);
486 let tidy = apply.apply(&mut self.txns).await;
487 self.tidy.merge(tidy);
488
489 let () = self.txns.compact_to(write_ts).await;
493
494 Ok(())
495 }
496 Err(current) => {
497 self.tidy.merge(txn.take_tidy());
498 debug!(
499 "unable to commit txn at {:?} current={:?}",
500 write_ts, current
501 );
502 Err(StorageError::InvalidUppers(
503 self.write_handles
504 .keys()
505 .copied()
506 .map(|id| InvalidUpper {
507 id,
508 current_upper: Antichain::from_elem(current),
509 })
510 .collect(),
511 ))
512 }
513 };
514 let _ = tx.send(response);
516 }
517}
518
519#[derive(Debug)]
528struct PersistTableWriteWorkerInner {
529 tx: UnboundedSender<(tracing::Span, PersistTableWriteCmd)>,
531}
532
533impl Drop for PersistTableWriteWorkerInner {
534 fn drop(&mut self) {
535 self.send(PersistTableWriteCmd::Shutdown);
536 }
538}
539
540impl PersistTableWriteWorkerInner {
541 fn new(tx: UnboundedSender<(tracing::Span, PersistTableWriteCmd)>) -> Self {
542 PersistTableWriteWorkerInner { tx }
543 }
544
545 fn send(&self, cmd: PersistTableWriteCmd) {
546 let span =
547 info_span!(parent: None, "PersistTableWriteWorkerInner::send", otel.name = cmd.name());
548 OpenTelemetryContext::obtain().attach_as_parent_to(&span);
549 self.send_with_span(span, cmd)
550 }
551
552 fn send_with_span(&self, span: Span, cmd: PersistTableWriteCmd) {
553 match self.tx.send((span, cmd)) {
554 Ok(()) => (), Err(e) => {
556 tracing::trace!("could not forward command: {:?}", e);
557 }
558 }
559 }
560}