1use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::fmt::Write;
16use std::sync::Arc;
17
18use differential_dataflow::lattice::Lattice;
19use futures::future::BoxFuture;
20use futures::stream::FuturesUnordered;
21use futures::{FutureExt, StreamExt};
22use itertools::Itertools;
23use mz_ore::tracing::OpenTelemetryContext;
24use mz_persist_client::ShardId;
25use mz_persist_client::write::WriteHandle;
26use mz_persist_types::Codec64;
27use mz_repr::{GlobalId, TimestampManipulation};
28use mz_storage_client::client::{TableData, Update};
29use mz_storage_types::StorageDiff;
30use mz_storage_types::controller::{InvalidUpper, TxnsCodecRow};
31use mz_storage_types::sources::SourceData;
32use mz_txn_wal::txns::{Tidy, TxnsHandle};
33use timely::order::TotalOrder;
34use timely::progress::{Antichain, Timestamp};
35use tokio::sync::mpsc::UnboundedSender;
36use tokio::sync::oneshot;
37use tracing::{Instrument, Span, debug, info_span};
38
39use crate::StorageError;
40
41mod read_only_table_worker;
42
43#[derive(Debug, Clone)]
44pub struct PersistTableWriteWorker<T: Timestamp + Lattice + Codec64 + TimestampManipulation> {
45 inner: Arc<PersistTableWriteWorkerInner<T>>,
46}
47
48#[derive(Debug)]
50enum PersistTableWriteCmd<T: Timestamp + Lattice + Codec64> {
51 Register(
52 T,
53 Vec<(GlobalId, WriteHandle<SourceData, (), T, StorageDiff>)>,
54 tokio::sync::oneshot::Sender<()>,
55 ),
56 Update {
57 existing_collection: GlobalId,
59 new_collection: GlobalId,
61 forget_ts: T,
63 register_ts: T,
65 handle: WriteHandle<SourceData, (), T, StorageDiff>,
67 tx: oneshot::Sender<()>,
69 },
70 DropHandles {
71 forget_ts: T,
72 ids: Vec<GlobalId>,
74 tx: oneshot::Sender<()>,
76 },
77 Append {
78 write_ts: T,
79 advance_to: T,
80 updates: Vec<(GlobalId, Vec<TableData>)>,
81 tx: tokio::sync::oneshot::Sender<Result<(), StorageError<T>>>,
82 },
83 Shutdown,
84}
85
86impl<T: Timestamp + Lattice + Codec64> PersistTableWriteCmd<T> {
87 fn name(&self) -> &'static str {
88 match self {
89 PersistTableWriteCmd::Register(_, _, _) => "PersistTableWriteCmd::Register",
90 PersistTableWriteCmd::Update { .. } => "PersistTableWriteCmd::Update",
91 PersistTableWriteCmd::DropHandles { .. } => "PersistTableWriteCmd::DropHandle",
92 PersistTableWriteCmd::Append { .. } => "PersistTableWriteCmd::Append",
93 PersistTableWriteCmd::Shutdown => "PersistTableWriteCmd::Shutdown",
94 }
95 }
96}
97
98async fn append_work<T2: Timestamp + TotalOrder + Lattice + Codec64 + Sync>(
99 write_handles: &mut BTreeMap<GlobalId, WriteHandle<SourceData, (), T2, StorageDiff>>,
100 mut commands: BTreeMap<
101 GlobalId,
102 (tracing::Span, Vec<Update<T2>>, Antichain<T2>, Antichain<T2>),
103 >,
104) -> Result<(), Vec<(GlobalId, Antichain<T2>)>> {
105 let futs = FuturesUnordered::new();
106
107 for (id, write) in write_handles.iter_mut() {
115 if let Some((span, updates, expected_upper, new_upper)) = commands.remove(id) {
116 let updates = updates.into_iter().map(|u| {
117 (
118 (SourceData(Ok(u.row)), ()),
119 u.timestamp,
120 u.diff.into_inner(),
121 )
122 });
123
124 futs.push(async move {
125 write
126 .compare_and_append(updates.clone(), expected_upper.clone(), new_upper.clone())
127 .instrument(span.clone())
128 .await
129 .expect("cannot append updates")
130 .or_else(|upper_mismatch| Err((*id, upper_mismatch.current)))?;
131
132 Ok::<_, (GlobalId, Antichain<T2>)>((*id, new_upper))
133 })
134 }
135 }
136
137 let (_new_uppers, failed_appends): (Vec<_>, Vec<_>) = futs
139 .collect::<Vec<_>>()
140 .await
141 .into_iter()
142 .partition_result();
143
144 if failed_appends.is_empty() {
145 Ok(())
146 } else {
147 Err(failed_appends)
148 }
149}
150
151impl<T: Timestamp + Lattice + Codec64 + TimestampManipulation> PersistTableWriteWorker<T> {
152 pub(crate) fn new_read_only_mode(
161 txns_handle: WriteHandle<SourceData, (), T, StorageDiff>,
162 ) -> Self {
163 let (tx, rx) =
164 tokio::sync::mpsc::unbounded_channel::<(tracing::Span, PersistTableWriteCmd<T>)>();
165 mz_ore::task::spawn(
166 || "PersistTableWriteWorker",
167 read_only_table_worker::read_only_mode_table_worker(rx, txns_handle),
168 );
169 Self {
170 inner: Arc::new(PersistTableWriteWorkerInner::new(tx)),
171 }
172 }
173
174 pub(crate) fn new_txns(txns: TxnsHandle<SourceData, (), T, StorageDiff, TxnsCodecRow>) -> Self {
175 let (tx, rx) =
176 tokio::sync::mpsc::unbounded_channel::<(tracing::Span, PersistTableWriteCmd<T>)>();
177 mz_ore::task::spawn(|| "PersistTableWriteWorker", async move {
178 let mut worker = TxnsTableWorker {
179 txns,
180 write_handles: BTreeMap::new(),
181 tidy: Tidy::default(),
182 };
183 worker.run(rx).await
184 });
185 Self {
186 inner: Arc::new(PersistTableWriteWorkerInner::new(tx)),
187 }
188 }
189
190 pub(crate) fn register(
191 &self,
192 register_ts: T,
193 ids_handles: Vec<(GlobalId, WriteHandle<SourceData, (), T, StorageDiff>)>,
194 ) -> tokio::sync::oneshot::Receiver<()> {
195 let span = info_span!("PersistTableWriteCmd::Register");
197 let (tx, rx) = tokio::sync::oneshot::channel();
198 let cmd = PersistTableWriteCmd::Register(register_ts, ids_handles, tx);
199 self.inner.send_with_span(span, cmd);
200 rx
201 }
202
203 #[allow(dead_code)]
211 pub(crate) fn update(
212 &self,
213 existing_collection: GlobalId,
214 new_collection: GlobalId,
215 forget_ts: T,
216 register_ts: T,
217 handle: WriteHandle<SourceData, (), T, StorageDiff>,
218 ) -> oneshot::Receiver<()> {
219 let (tx, rx) = oneshot::channel();
220 self.send(PersistTableWriteCmd::Update {
221 existing_collection,
222 new_collection,
223 forget_ts,
224 register_ts,
225 handle,
226 tx,
227 });
228 rx
229 }
230
231 pub(crate) fn append(
232 &self,
233 write_ts: T,
234 advance_to: T,
235 updates: Vec<(GlobalId, Vec<TableData>)>,
236 ) -> tokio::sync::oneshot::Receiver<Result<(), StorageError<T>>> {
237 let (tx, rx) = tokio::sync::oneshot::channel();
238 if updates.is_empty() {
239 tx.send(Ok(()))
240 .expect("rx has not been dropped at this point");
241 rx
242 } else {
243 self.send(PersistTableWriteCmd::Append {
244 write_ts,
245 advance_to,
246 updates,
247 tx,
248 });
249 rx
250 }
251 }
252
253 pub(crate) fn drop_handles(&self, ids: Vec<GlobalId>, forget_ts: T) -> BoxFuture<'static, ()> {
258 let (tx, rx) = oneshot::channel();
259 self.send(PersistTableWriteCmd::DropHandles { forget_ts, ids, tx });
260 Box::pin(rx.map(|_| ()))
261 }
262
263 fn send(&self, cmd: PersistTableWriteCmd<T>) {
264 self.inner.send(cmd);
265 }
266}
267
268struct TxnsTableWorker<T: Timestamp + Lattice + TotalOrder + Codec64> {
269 txns: TxnsHandle<SourceData, (), T, StorageDiff, TxnsCodecRow>,
270 write_handles: BTreeMap<GlobalId, ShardId>,
271 tidy: Tidy,
272}
273
274impl<T: Timestamp + Lattice + Codec64 + TimestampManipulation> TxnsTableWorker<T> {
275 async fn run(
276 &mut self,
277 mut rx: tokio::sync::mpsc::UnboundedReceiver<(Span, PersistTableWriteCmd<T>)>,
278 ) {
279 while let Some((span, command)) = rx.recv().await {
280 match command {
281 PersistTableWriteCmd::Register(register_ts, ids_handles, tx) => {
282 self.register(register_ts, ids_handles)
283 .instrument(span)
284 .await;
285 let _ = tx.send(());
287 }
288 PersistTableWriteCmd::Update {
289 existing_collection,
290 new_collection,
291 forget_ts,
292 register_ts,
293 handle,
294 tx,
295 } => {
296 async {
297 self.drop_handles(vec![existing_collection], forget_ts)
298 .await;
299 self.register(register_ts, vec![(new_collection, handle)])
300 .await;
301 }
302 .instrument(span)
303 .await;
304 let _ = tx.send(());
306 }
307 PersistTableWriteCmd::DropHandles { forget_ts, ids, tx } => {
308 self.drop_handles(ids, forget_ts).instrument(span).await;
309 let _ = tx.send(());
311 }
312 PersistTableWriteCmd::Append {
313 write_ts,
314 advance_to,
315 updates,
316 tx,
317 } => {
318 self.append(write_ts, advance_to, updates, tx)
319 .instrument(span)
320 .await
321 }
322 PersistTableWriteCmd::Shutdown => {
323 tracing::info!("PersistTableWriteWorker shutting down via command");
324 return;
325 }
326 }
327 }
328
329 tracing::info!("PersistTableWriteWorker shutting down via input exhaustion");
330 }
331
332 async fn register(
333 &mut self,
334 register_ts: T,
335 mut ids_handles: Vec<(GlobalId, WriteHandle<SourceData, (), T, StorageDiff>)>,
336 ) {
337 ids_handles.sort_unstable_by_key(|(gid, _handle)| *gid);
341
342 for (id, write_handle) in ids_handles.iter() {
343 debug!(
344 "tables register {} {:.9}",
345 id,
346 write_handle.shard_id().to_string()
347 );
348 let previous = self.write_handles.insert(*id, write_handle.shard_id());
349 if previous.is_some() {
350 panic!("already registered a WriteHandle for collection {:?}", id);
351 }
352 }
353
354 let new_ids = ids_handles.iter().map(|(id, _)| *id).collect_vec();
356 let handles = ids_handles.into_iter().map(|(_, handle)| handle);
357 let res = self.txns.register(register_ts.clone(), handles).await;
358 match res {
359 Ok(tidy) => {
360 self.tidy.merge(tidy);
361 }
362 Err(current) => {
363 panic!(
364 "cannot register {:?} at {:?} because txns is at {:?}",
365 new_ids, register_ts, current
366 );
367 }
368 }
369 }
370
371 async fn drop_handles(&mut self, ids: Vec<GlobalId>, forget_ts: T) {
372 tracing::info!(?ids, "drop tables");
373 let data_ids = ids
374 .iter()
375 .filter_map(|id| self.write_handles.remove(id))
380 .collect::<BTreeSet<_>>();
381 if !data_ids.is_empty() {
382 match self.txns.forget(forget_ts.clone(), data_ids.clone()).await {
383 Ok(tidy) => {
384 self.tidy.merge(tidy);
385 }
386 Err(current) => {
387 panic!(
388 "cannot forget {:?} at {:?} because txns is at {:?}",
389 ids, forget_ts, current
390 );
391 }
392 }
393 }
394 }
395
396 async fn append(
397 &mut self,
398 write_ts: T,
399 advance_to: T,
400 updates: Vec<(GlobalId, Vec<TableData>)>,
401 tx: tokio::sync::oneshot::Sender<Result<(), StorageError<T>>>,
402 ) {
403 debug!(
404 "tables append timestamp={:?} advance_to={:?} len={} ids={:?}{}",
405 write_ts,
406 advance_to,
407 updates.iter().flat_map(|(_, x)| x).count(),
408 updates
409 .iter()
410 .map(|(x, _)| x.to_string())
411 .collect::<BTreeSet<_>>(),
412 updates.iter().filter(|(_, v)| !v.is_empty()).fold(
413 String::new(),
414 |mut output, (k, v)| {
415 let _ = write!(output, "\n {}: {:?}", k, v.first());
416 output
417 }
418 )
419 );
420 assert_eq!(
426 advance_to,
427 mz_persist_types::StepForward::step_forward(&write_ts)
428 );
429
430 let mut txn = self.txns.begin();
431 for (id, updates) in updates {
432 let Some(data_id) = self.write_handles.get(&id) else {
433 assert!(
437 updates.iter().all(|u| u.is_empty()),
438 "{}: {:?}",
439 id,
440 updates
441 );
442 continue;
443 };
444 for update in updates {
445 match update {
446 TableData::Rows(updates) => {
447 for (row, diff) in updates {
448 let () = txn
449 .write(data_id, SourceData(Ok(row)), (), diff.into_inner())
450 .await;
451 }
452 }
453 TableData::Batches(batches) => {
454 for batch in batches {
455 let () = txn.write_batch(data_id, batch);
456 }
457 }
458 }
459 }
460 }
461 txn.tidy(std::mem::take(&mut self.tidy));
463 let txn_res = txn.commit_at(&mut self.txns, write_ts.clone()).await;
464 let response = match txn_res {
465 Ok(apply) => {
466 debug!("applying {:?}", apply);
469 let tidy = apply.apply(&mut self.txns).await;
470 self.tidy.merge(tidy);
471
472 let () = self.txns.compact_to(write_ts).await;
476
477 Ok(())
478 }
479 Err(current) => {
480 self.tidy.merge(txn.take_tidy());
481 debug!(
482 "unable to commit txn at {:?} current={:?}",
483 write_ts, current
484 );
485 Err(StorageError::InvalidUppers(
486 self.write_handles
487 .keys()
488 .copied()
489 .map(|id| InvalidUpper {
490 id,
491 current_upper: Antichain::from_elem(current.clone()),
492 })
493 .collect(),
494 ))
495 }
496 };
497 let _ = tx.send(response);
499 }
500}
501
502#[derive(Debug)]
511struct PersistTableWriteWorkerInner<T: Timestamp + Lattice + Codec64 + TimestampManipulation> {
512 tx: UnboundedSender<(tracing::Span, PersistTableWriteCmd<T>)>,
514}
515
516impl<T> Drop for PersistTableWriteWorkerInner<T>
517where
518 T: Timestamp + Lattice + Codec64 + TimestampManipulation,
519{
520 fn drop(&mut self) {
521 self.send(PersistTableWriteCmd::Shutdown);
522 }
524}
525
526impl<T> PersistTableWriteWorkerInner<T>
527where
528 T: Timestamp + Lattice + Codec64 + TimestampManipulation,
529{
530 fn new(tx: UnboundedSender<(tracing::Span, PersistTableWriteCmd<T>)>) -> Self {
531 PersistTableWriteWorkerInner { tx }
532 }
533
534 fn send(&self, cmd: PersistTableWriteCmd<T>) {
535 let span =
536 info_span!(parent: None, "PersistTableWriteWorkerInner::send", otel.name = cmd.name());
537 OpenTelemetryContext::obtain().attach_as_parent_to(&span);
538 self.send_with_span(span, cmd)
539 }
540
541 fn send_with_span(&self, span: Span, cmd: PersistTableWriteCmd<T>) {
542 match self.tx.send((span, cmd)) {
543 Ok(()) => (), Err(e) => {
545 tracing::trace!("could not forward command: {:?}", e);
546 }
547 }
548 }
549}