1use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::fmt::Write;
16use std::sync::Arc;
17
18use differential_dataflow::lattice::Lattice;
19use futures::future::BoxFuture;
20use futures::stream::FuturesUnordered;
21use futures::{FutureExt, StreamExt};
22use itertools::Itertools;
23use mz_ore::tracing::OpenTelemetryContext;
24use mz_persist_client::ShardId;
25use mz_persist_client::write::WriteHandle;
26use mz_persist_types::Codec64;
27use mz_repr::{GlobalId, TimestampManipulation};
28use mz_storage_client::client::{TableData, Update};
29use mz_storage_types::StorageDiff;
30use mz_storage_types::controller::{InvalidUpper, TxnsCodecRow};
31use mz_storage_types::sources::SourceData;
32use mz_txn_wal::txns::{Tidy, TxnsHandle};
33use timely::order::TotalOrder;
34use timely::progress::{Antichain, Timestamp};
35use tokio::sync::mpsc::UnboundedSender;
36use tokio::sync::oneshot;
37use tracing::{Instrument, Span, debug, info_span};
38
39use crate::{PersistEpoch, StorageError};
40
41mod read_only_table_worker;
42
43#[derive(Debug, Clone)]
44pub struct PersistTableWriteWorker<T: Timestamp + Lattice + Codec64 + TimestampManipulation> {
45 inner: Arc<PersistTableWriteWorkerInner<T>>,
46}
47
48#[derive(Debug)]
50enum PersistTableWriteCmd<T: Timestamp + Lattice + Codec64> {
51 Register(
52 T,
53 Vec<(GlobalId, WriteHandle<SourceData, (), T, StorageDiff>)>,
54 tokio::sync::oneshot::Sender<()>,
55 ),
56 Update {
57 existing_collection: GlobalId,
59 new_collection: GlobalId,
61 forget_ts: T,
63 register_ts: T,
65 handle: WriteHandle<SourceData, (), T, StorageDiff>,
67 tx: oneshot::Sender<()>,
69 },
70 DropHandles {
71 forget_ts: T,
72 ids: Vec<GlobalId>,
74 tx: oneshot::Sender<()>,
76 },
77 Append {
78 write_ts: T,
79 advance_to: T,
80 updates: Vec<(GlobalId, Vec<TableData>)>,
81 tx: tokio::sync::oneshot::Sender<Result<(), StorageError<T>>>,
82 },
83 Shutdown,
84}
85
86impl<T: Timestamp + Lattice + Codec64> PersistTableWriteCmd<T> {
87 fn name(&self) -> &'static str {
88 match self {
89 PersistTableWriteCmd::Register(_, _, _) => "PersistTableWriteCmd::Register",
90 PersistTableWriteCmd::Update { .. } => "PersistTableWriteCmd::Update",
91 PersistTableWriteCmd::DropHandles { .. } => "PersistTableWriteCmd::DropHandle",
92 PersistTableWriteCmd::Append { .. } => "PersistTableWriteCmd::Append",
93 PersistTableWriteCmd::Shutdown => "PersistTableWriteCmd::Shutdown",
94 }
95 }
96}
97
98async fn append_work<T2: Timestamp + Lattice + Codec64 + Sync>(
99 write_handles: &mut BTreeMap<GlobalId, WriteHandle<SourceData, (), T2, StorageDiff>>,
100 mut commands: BTreeMap<
101 GlobalId,
102 (tracing::Span, Vec<Update<T2>>, Antichain<T2>, Antichain<T2>),
103 >,
104) -> Result<(), Vec<(GlobalId, Antichain<T2>)>> {
105 let futs = FuturesUnordered::new();
106
107 for (id, write) in write_handles.iter_mut() {
115 if let Some((span, updates, expected_upper, new_upper)) = commands.remove(id) {
116 let updates = updates.into_iter().map(|u| {
117 (
118 (SourceData(Ok(u.row)), ()),
119 u.timestamp,
120 u.diff.into_inner(),
121 )
122 });
123
124 futs.push(async move {
125 write
126 .compare_and_append(updates.clone(), expected_upper.clone(), new_upper.clone())
127 .instrument(span.clone())
128 .await
129 .expect("cannot append updates")
130 .or_else(|upper_mismatch| Err((*id, upper_mismatch.current)))?;
131
132 Ok::<_, (GlobalId, Antichain<T2>)>((*id, new_upper))
133 })
134 }
135 }
136
137 let (_new_uppers, failed_appends): (Vec<_>, Vec<_>) = futs
139 .collect::<Vec<_>>()
140 .await
141 .into_iter()
142 .partition_result();
143
144 if failed_appends.is_empty() {
145 Ok(())
146 } else {
147 Err(failed_appends)
148 }
149}
150
151impl<T: Timestamp + Lattice + Codec64 + TimestampManipulation> PersistTableWriteWorker<T> {
152 pub(crate) fn new_read_only_mode(
161 txns_handle: WriteHandle<SourceData, (), T, StorageDiff>,
162 ) -> Self {
163 let (tx, rx) =
164 tokio::sync::mpsc::unbounded_channel::<(tracing::Span, PersistTableWriteCmd<T>)>();
165 mz_ore::task::spawn(
166 || "PersistTableWriteWorker",
167 read_only_table_worker::read_only_mode_table_worker(rx, txns_handle),
168 );
169 Self {
170 inner: Arc::new(PersistTableWriteWorkerInner::new(tx)),
171 }
172 }
173
174 pub(crate) fn new_txns(
175 txns: TxnsHandle<SourceData, (), T, StorageDiff, PersistEpoch, TxnsCodecRow>,
176 ) -> Self {
177 let (tx, rx) =
178 tokio::sync::mpsc::unbounded_channel::<(tracing::Span, PersistTableWriteCmd<T>)>();
179 mz_ore::task::spawn(|| "PersistTableWriteWorker", async move {
180 let mut worker = TxnsTableWorker {
181 txns,
182 write_handles: BTreeMap::new(),
183 tidy: Tidy::default(),
184 };
185 worker.run(rx).await
186 });
187 Self {
188 inner: Arc::new(PersistTableWriteWorkerInner::new(tx)),
189 }
190 }
191
192 pub(crate) fn register(
193 &self,
194 register_ts: T,
195 ids_handles: Vec<(GlobalId, WriteHandle<SourceData, (), T, StorageDiff>)>,
196 ) -> tokio::sync::oneshot::Receiver<()> {
197 let span = info_span!("PersistTableWriteCmd::Register");
199 let (tx, rx) = tokio::sync::oneshot::channel();
200 let cmd = PersistTableWriteCmd::Register(register_ts, ids_handles, tx);
201 self.inner.send_with_span(span, cmd);
202 rx
203 }
204
205 #[allow(dead_code)]
213 pub(crate) fn update(
214 &self,
215 existing_collection: GlobalId,
216 new_collection: GlobalId,
217 forget_ts: T,
218 register_ts: T,
219 handle: WriteHandle<SourceData, (), T, StorageDiff>,
220 ) -> oneshot::Receiver<()> {
221 let (tx, rx) = oneshot::channel();
222 self.send(PersistTableWriteCmd::Update {
223 existing_collection,
224 new_collection,
225 forget_ts,
226 register_ts,
227 handle,
228 tx,
229 });
230 rx
231 }
232
233 pub(crate) fn append(
234 &self,
235 write_ts: T,
236 advance_to: T,
237 updates: Vec<(GlobalId, Vec<TableData>)>,
238 ) -> tokio::sync::oneshot::Receiver<Result<(), StorageError<T>>> {
239 let (tx, rx) = tokio::sync::oneshot::channel();
240 if updates.is_empty() {
241 tx.send(Ok(()))
242 .expect("rx has not been dropped at this point");
243 rx
244 } else {
245 self.send(PersistTableWriteCmd::Append {
246 write_ts,
247 advance_to,
248 updates,
249 tx,
250 });
251 rx
252 }
253 }
254
255 pub(crate) fn drop_handles(&self, ids: Vec<GlobalId>, forget_ts: T) -> BoxFuture<'static, ()> {
260 let (tx, rx) = oneshot::channel();
261 self.send(PersistTableWriteCmd::DropHandles { forget_ts, ids, tx });
262 Box::pin(rx.map(|_| ()))
263 }
264
265 fn send(&self, cmd: PersistTableWriteCmd<T>) {
266 self.inner.send(cmd);
267 }
268}
269
270struct TxnsTableWorker<T: Timestamp + Lattice + TotalOrder + Codec64> {
271 txns: TxnsHandle<SourceData, (), T, StorageDiff, PersistEpoch, TxnsCodecRow>,
272 write_handles: BTreeMap<GlobalId, ShardId>,
273 tidy: Tidy,
274}
275
276impl<T: Timestamp + Lattice + Codec64 + TimestampManipulation> TxnsTableWorker<T> {
277 async fn run(
278 &mut self,
279 mut rx: tokio::sync::mpsc::UnboundedReceiver<(Span, PersistTableWriteCmd<T>)>,
280 ) {
281 while let Some((span, command)) = rx.recv().await {
282 match command {
283 PersistTableWriteCmd::Register(register_ts, ids_handles, tx) => {
284 self.register(register_ts, ids_handles)
285 .instrument(span)
286 .await;
287 let _ = tx.send(());
289 }
290 PersistTableWriteCmd::Update {
291 existing_collection,
292 new_collection,
293 forget_ts,
294 register_ts,
295 handle,
296 tx,
297 } => {
298 async {
299 self.drop_handles(vec![existing_collection], forget_ts)
300 .await;
301 self.register(register_ts, vec![(new_collection, handle)])
302 .await;
303 }
304 .instrument(span)
305 .await;
306 let _ = tx.send(());
308 }
309 PersistTableWriteCmd::DropHandles { forget_ts, ids, tx } => {
310 self.drop_handles(ids, forget_ts).instrument(span).await;
311 let _ = tx.send(());
313 }
314 PersistTableWriteCmd::Append {
315 write_ts,
316 advance_to,
317 updates,
318 tx,
319 } => {
320 self.append(write_ts, advance_to, updates, tx)
321 .instrument(span)
322 .await
323 }
324 PersistTableWriteCmd::Shutdown => {
325 tracing::info!("PersistTableWriteWorker shutting down via command");
326 return;
327 }
328 }
329 }
330
331 tracing::info!("PersistTableWriteWorker shutting down via input exhaustion");
332 }
333
334 async fn register(
335 &mut self,
336 register_ts: T,
337 mut ids_handles: Vec<(GlobalId, WriteHandle<SourceData, (), T, StorageDiff>)>,
338 ) {
339 ids_handles.sort_unstable_by_key(|(gid, _handle)| *gid);
343
344 for (id, write_handle) in ids_handles.iter() {
345 debug!(
346 "tables register {} {:.9}",
347 id,
348 write_handle.shard_id().to_string()
349 );
350 let previous = self.write_handles.insert(*id, write_handle.shard_id());
351 if previous.is_some() {
352 panic!("already registered a WriteHandle for collection {:?}", id);
353 }
354 }
355
356 let new_ids = ids_handles.iter().map(|(id, _)| *id).collect_vec();
358 let handles = ids_handles.into_iter().map(|(_, handle)| handle);
359 let res = self.txns.register(register_ts.clone(), handles).await;
360 match res {
361 Ok(tidy) => {
362 self.tidy.merge(tidy);
363 }
364 Err(current) => {
365 panic!(
366 "cannot register {:?} at {:?} because txns is at {:?}",
367 new_ids, register_ts, current
368 );
369 }
370 }
371 }
372
373 async fn drop_handles(&mut self, ids: Vec<GlobalId>, forget_ts: T) {
374 tracing::info!(?ids, "drop tables");
375 let data_ids = ids
376 .iter()
377 .filter_map(|id| self.write_handles.remove(id))
382 .collect::<BTreeSet<_>>();
383 if !data_ids.is_empty() {
384 match self.txns.forget(forget_ts.clone(), data_ids.clone()).await {
385 Ok(tidy) => {
386 self.tidy.merge(tidy);
387 }
388 Err(current) => {
389 panic!(
390 "cannot forget {:?} at {:?} because txns is at {:?}",
391 ids, forget_ts, current
392 );
393 }
394 }
395 }
396 }
397
398 async fn append(
399 &mut self,
400 write_ts: T,
401 advance_to: T,
402 updates: Vec<(GlobalId, Vec<TableData>)>,
403 tx: tokio::sync::oneshot::Sender<Result<(), StorageError<T>>>,
404 ) {
405 debug!(
406 "tables append timestamp={:?} advance_to={:?} len={} ids={:?}{}",
407 write_ts,
408 advance_to,
409 updates.iter().flat_map(|(_, x)| x).count(),
410 updates
411 .iter()
412 .map(|(x, _)| x.to_string())
413 .collect::<BTreeSet<_>>(),
414 updates.iter().filter(|(_, v)| !v.is_empty()).fold(
415 String::new(),
416 |mut output, (k, v)| {
417 let _ = write!(output, "\n {}: {:?}", k, v.first());
418 output
419 }
420 )
421 );
422 assert_eq!(
428 advance_to,
429 mz_persist_types::StepForward::step_forward(&write_ts)
430 );
431
432 let mut txn = self.txns.begin();
433 for (id, updates) in updates {
434 let Some(data_id) = self.write_handles.get(&id) else {
435 assert!(
439 updates.iter().all(|u| u.is_empty()),
440 "{}: {:?}",
441 id,
442 updates
443 );
444 continue;
445 };
446 for update in updates {
447 match update {
448 TableData::Rows(updates) => {
449 for (row, diff) in updates {
450 let () = txn
451 .write(data_id, SourceData(Ok(row)), (), diff.into_inner())
452 .await;
453 }
454 }
455 TableData::Batches(batches) => {
456 for batch in batches {
457 let () = txn.write_batch(data_id, batch);
458 }
459 }
460 }
461 }
462 }
463 txn.tidy(std::mem::take(&mut self.tidy));
465 let txn_res = txn.commit_at(&mut self.txns, write_ts.clone()).await;
466 let response = match txn_res {
467 Ok(apply) => {
468 debug!("applying {:?}", apply);
471 let tidy = apply.apply(&mut self.txns).await;
472 self.tidy.merge(tidy);
473
474 let () = self.txns.compact_to(write_ts).await;
478
479 Ok(())
480 }
481 Err(current) => {
482 self.tidy.merge(txn.take_tidy());
483 debug!(
484 "unable to commit txn at {:?} current={:?}",
485 write_ts, current
486 );
487 Err(StorageError::InvalidUppers(
488 self.write_handles
489 .keys()
490 .copied()
491 .map(|id| InvalidUpper {
492 id,
493 current_upper: Antichain::from_elem(current.clone()),
494 })
495 .collect(),
496 ))
497 }
498 };
499 let _ = tx.send(response);
501 }
502}
503
504#[derive(Debug)]
513struct PersistTableWriteWorkerInner<T: Timestamp + Lattice + Codec64 + TimestampManipulation> {
514 tx: UnboundedSender<(tracing::Span, PersistTableWriteCmd<T>)>,
516}
517
518impl<T> Drop for PersistTableWriteWorkerInner<T>
519where
520 T: Timestamp + Lattice + Codec64 + TimestampManipulation,
521{
522 fn drop(&mut self) {
523 self.send(PersistTableWriteCmd::Shutdown);
524 }
526}
527
528impl<T> PersistTableWriteWorkerInner<T>
529where
530 T: Timestamp + Lattice + Codec64 + TimestampManipulation,
531{
532 fn new(tx: UnboundedSender<(tracing::Span, PersistTableWriteCmd<T>)>) -> Self {
533 PersistTableWriteWorkerInner { tx }
534 }
535
536 fn send(&self, cmd: PersistTableWriteCmd<T>) {
537 let span =
538 info_span!(parent: None, "PersistTableWriteWorkerInner::send", otel.name = cmd.name());
539 OpenTelemetryContext::obtain().attach_as_parent_to(&span);
540 self.send_with_span(span, cmd)
541 }
542
543 fn send_with_span(&self, span: Span, cmd: PersistTableWriteCmd<T>) {
544 match self.tx.send((span, cmd)) {
545 Ok(()) => (), Err(e) => {
547 tracing::trace!("could not forward command: {:?}", e);
548 }
549 }
550 }
551}