1use std::collections::btree_map::Entry;
14use std::collections::hash_map::DefaultHasher;
15use std::collections::{BTreeMap, BTreeSet};
16use std::hash::{Hash, Hasher};
17use std::sync::Arc;
18use std::time::{Duration, SystemTime, UNIX_EPOCH};
19
20use async_trait::async_trait;
21use differential_dataflow::consolidation::consolidate_updates;
22use mz_dyncfg::ConfigUpdates;
23use mz_ore::metrics::MetricsRegistry;
24use mz_ore::now::{NOW_ZERO, SYSTEM_TIME};
25use mz_persist::cfg::{BlobConfig, ConsensusConfig};
26use mz_persist::location::{Blob, Consensus, ExternalError};
27use mz_persist::unreliable::{UnreliableBlob, UnreliableConsensus, UnreliableHandle};
28use mz_persist_client::async_runtime::IsolatedRuntime;
29use mz_persist_client::cache::StateCache;
30use mz_persist_client::cfg::PersistConfig;
31use mz_persist_client::metrics::Metrics as PersistMetrics;
32use mz_persist_client::read::ReadHandle;
33use mz_persist_client::rpc::PubSubClientConnection;
34use mz_persist_client::{Diagnostics, PersistClient, ShardId};
35use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
36use mz_timestamp_oracle::TimestampOracle;
37use mz_timestamp_oracle::postgres_oracle::{
38 PostgresTimestampOracle, PostgresTimestampOracleConfig,
39};
40use mz_txn_wal::metrics::Metrics as TxnMetrics;
41use mz_txn_wal::operator::DataSubscribeTask;
42use mz_txn_wal::txns::{Tidy, TxnsHandle};
43use timely::progress::Timestamp;
44use tokio::sync::Mutex;
45use tracing::{debug, info};
46
47use crate::maelstrom::Args;
48use crate::maelstrom::api::{Body, MaelstromError, NodeId, ReqTxnOp, ResTxnOp};
49use crate::maelstrom::node::{Handle, Service};
50use crate::maelstrom::services::{
51 CachingBlob, MaelstromBlob, MaelstromConsensus, MemTimestampOracle,
52};
53
54#[derive(Debug)]
55pub struct Transactor {
56 txns_id: ShardId,
57 oracle: Box<dyn TimestampOracle<mz_repr::Timestamp> + Send>,
58 client: PersistClient,
59 txns: TxnsHandle<String, (), u64, i64>,
60 tidy: Tidy,
61 data_reads: BTreeMap<ShardId, (u64, ReadHandle<String, (), u64, i64>)>,
62 peeks: BTreeMap<ShardId, DataSubscribeTask>,
63}
64
65impl Transactor {
66 pub async fn new(
67 client: PersistClient,
68 txns_id: ShardId,
69 oracle: Box<dyn TimestampOracle<mz_repr::Timestamp> + Send>,
70 ) -> Result<Self, MaelstromError> {
71 let init_ts = u64::from(oracle.write_ts().await.timestamp);
72 let txns = TxnsHandle::open(
73 init_ts,
74 client.clone(),
75 mz_txn_wal::all_dyncfgs(client.dyncfgs().clone()),
76 Arc::new(TxnMetrics::new(&MetricsRegistry::new())),
77 txns_id,
78 )
79 .await;
80 oracle.apply_write(init_ts.into()).await;
81 Ok(Transactor {
82 txns_id,
83 oracle,
84 txns,
85 tidy: Tidy::default(),
86 client,
87 data_reads: BTreeMap::default(),
88 peeks: BTreeMap::default(),
89 })
90 }
91
92 pub async fn transact(
93 &mut self,
94 req_ops: &[ReqTxnOp],
95 ) -> Result<Vec<ResTxnOp>, MaelstromError> {
96 let mut read_ids = Vec::new();
97 let mut writes = BTreeMap::<ShardId, Vec<(String, i64)>>::new();
98 for op in req_ops {
99 match op {
100 ReqTxnOp::Read { key } => {
101 read_ids.push(self.key_shard(*key));
102 }
103 ReqTxnOp::Append { key, val } => writes
104 .entry(self.key_shard(*key))
105 .or_default()
106 .push((val.to_string(), 1)),
107 }
108 }
109
110 for data_id in writes.keys().chain(read_ids.iter()) {
112 let _init_ts = self.ensure_registered(data_id).await;
113 }
114
115 let mut read_ts = u64::from(self.oracle.read_ts().await);
117 info!("read ts {}", read_ts);
118 self.peeks.clear();
119 self.read_at(read_ts, read_ids.iter()).await;
120 if writes.is_empty() {
121 debug!("req committed at read_ts={}", read_ts);
122 } else {
123 let mut txn = self.txns.begin();
124 for (data_id, writes) in writes {
125 for (data, diff) in writes {
126 txn.write(&data_id, data, (), diff).await;
127 }
128 }
129 let mut write_ts = u64::from(self.oracle.write_ts().await.timestamp);
130 loop {
131 let new_read_ts = write_ts.checked_sub(1).expect("write_ts should be > 0");
137 info!("read ts {} write ts {}", new_read_ts, write_ts);
138 if new_read_ts != read_ts {
139 self.unblock_and_read_at(new_read_ts, read_ids.iter()).await;
140 read_ts = new_read_ts;
141 }
142
143 txn.tidy(std::mem::take(&mut self.tidy));
144 match txn.commit_at(&mut self.txns, write_ts).await {
145 Ok(maintenance) => {
146 self.oracle.apply_write(write_ts.into()).await;
147 self.txns.compact_to(write_ts).await;
151
152 debug!("req committed at read_ts={} write_ts={}", read_ts, write_ts);
153 let tidy = maintenance.apply(&mut self.txns).await;
154 self.tidy.merge(tidy);
155 break;
156 }
157 Err(current) => {
158 write_ts = current;
159 continue;
162 }
163 }
164 }
165 }
166
167 let mut this_txn_writes = BTreeMap::<_, Vec<_>>::new();
172
173 let res = req_ops
174 .iter()
175 .map(|op| match op {
176 ReqTxnOp::Read { key } => {
177 let key_shard = self.key_shard(*key);
178 let mut data = self
179 .peeks
180 .get(&key_shard)
181 .expect("key should have been read")
182 .output()
183 .iter()
184 .filter(|(_, t, _)| *t <= read_ts)
188 .map(|(k, t, d)| {
189 let k = k.parse().expect("valid u64");
190 (k, *t, *d)
191 })
192 .collect::<Vec<_>>();
193 let mut seen = BTreeSet::new();
194 let mut val = Vec::new();
195 consolidate_updates(&mut data);
196 data.sort_by_key(|(k, t, d)| (*t, *k, std::cmp::Reverse(*d)));
200 debug!(
201 "{} {:.9} read after sort {:?}",
202 key,
203 key_shard.to_string(),
204 data
205 );
206 for (x, _, d) in data {
207 if d == 1 {
208 assert!(seen.insert(x));
209 val.push(x);
210 } else if d == -1 {
211 assert!(seen.remove(&x));
212 val.retain(|y| *y != x);
213 } else {
214 panic!("unexpected diff: {}", d);
215 }
216 }
217 if let Some(this_writes) = this_txn_writes.get(key) {
218 val.extend(this_writes.iter().copied());
219 }
220 ResTxnOp::Read { key: *key, val }
221 }
222 ReqTxnOp::Append { key, val } => {
223 this_txn_writes.entry(key).or_default().push(val);
224 ResTxnOp::Append {
225 key: *key,
226 val: *val,
227 }
228 }
229 })
230 .collect();
231 Ok(res)
232 }
233
234 async fn ensure_registered(&mut self, data_id: &ShardId) -> Result<u64, ExternalError> {
236 if let Some((init_ts, _)) = self.data_reads.get(data_id) {
238 return Ok(*init_ts);
239 }
240
241 let data_read = self
243 .client
244 .open_leased_reader(
245 *data_id,
246 Arc::new(StringSchema),
247 Arc::new(UnitSchema),
248 Diagnostics::from_purpose("txn data"),
249 true,
250 )
251 .await
252 .expect("data schema shouldn't change");
253
254 let mut init_ts = u64::from(self.oracle.write_ts().await.timestamp);
255 loop {
256 let data_write = self
257 .client
258 .open_writer(
259 *data_id,
260 Arc::new(StringSchema),
261 Arc::new(UnitSchema),
262 Diagnostics::from_purpose("txn data"),
263 )
264 .await
265 .expect("data schema shouldn't change");
266 let res = self.txns.register(init_ts, [data_write]).await;
267 match res {
268 Ok(_) => {
269 self.oracle.apply_write(init_ts.into()).await;
270 self.data_reads.insert(*data_id, (init_ts, data_read));
271 return Ok(init_ts);
272 }
273 Err(new_init_ts) => {
274 debug!(
275 "register {:.9} at {} mismatch current={}",
276 data_id, init_ts, new_init_ts
277 );
278 init_ts = u64::from(self.oracle.write_ts().await.timestamp);
279 continue;
280 }
281 }
282 }
283 }
284
285 async fn read_at(&mut self, read_ts: u64, data_ids: impl Iterator<Item = &ShardId>) {
286 let tidy = self.txns.apply_le(&read_ts).await;
288 self.tidy.merge(tidy);
289
290 for data_id in data_ids {
298 let peek = match self.peeks.entry(*data_id) {
299 Entry::Occupied(x) => x.into_mut(),
300 Entry::Vacant(x) => {
301 let peek =
302 DataSubscribeTask::new(self.client.clone(), self.txns_id, *data_id, 0)
303 .await;
304 x.insert(peek)
305 }
306 };
307 peek.step_past(read_ts).await;
308 }
309 }
310
311 async fn unblock_and_read_at(
312 &mut self,
313 read_ts: u64,
314 data_ids: impl Iterator<Item = &ShardId>,
315 ) {
316 debug!("unblock_and_read_at {}", read_ts);
317 let mut txn = self.txns.begin();
318 match txn.commit_at(&mut self.txns, read_ts).await {
319 Ok(apply) => {
320 self.tidy.merge(apply.apply(&mut self.txns).await);
321 }
322 Err(_) => {}
324 }
325 self.read_at(read_ts, data_ids).await
326 }
327
328 fn key_shard(&self, key: u64) -> ShardId {
332 let mut h = DefaultHasher::new();
333 key.hash(&mut h);
334 self.txns_id.hash(&mut h);
335 let mut buf = [0u8; 16];
336 buf[0..8].copy_from_slice(&h.finish().to_le_bytes());
337 let shard_id = format!("s{}", uuid::Uuid::from_bytes(buf));
338 shard_id.parse().expect("valid shard id")
339 }
340}
341
342#[derive(Debug)]
344pub struct TransactorService(pub Arc<Mutex<Transactor>>);
345
346#[async_trait]
347impl Service for TransactorService {
348 async fn init(args: &Args, handle: &Handle) -> Result<Self, MaelstromError> {
349 let shard_id = handle.maybe_init_shard_id().await?;
353
354 let seed: u64 = SystemTime::now()
357 .duration_since(UNIX_EPOCH)
358 .unwrap_or_default()
359 .subsec_nanos()
360 .into();
361 let should_happen = 1.0 - args.unreliability;
364 let should_timeout = args.unreliability;
370 let unreliable = UnreliableHandle::new(seed, should_happen, should_timeout);
373
374 let mut config =
375 PersistConfig::new_default_configs(&mz_persist_client::BUILD_INFO, SYSTEM_TIME.clone());
376 {
377 let mut updates = ConfigUpdates::default();
380 updates.add(&mz_persist::postgres::USE_POSTGRES_TUNED_QUERIES, true);
381 config.apply_from(&updates);
382 }
383
384 let metrics_registry = MetricsRegistry::new();
385 let metrics = Arc::new(PersistMetrics::new(&config, &metrics_registry));
386
387 let blob = match &args.blob_uri {
389 Some(blob_uri) => {
390 let cfg = BlobConfig::try_from(
391 blob_uri,
392 Box::new(config.clone()),
393 metrics.s3_blob.clone(),
394 Arc::clone(&config.configs),
395 )
396 .await
397 .expect("blob_uri should be valid");
398 loop {
399 match cfg.clone().open().await {
400 Ok(x) => break x,
401 Err(err) => {
402 info!("failed to open blob, trying again: {}", err);
403 }
404 }
405 }
406 }
407 None => MaelstromBlob::new(handle.clone()),
408 };
409 let blob: Arc<dyn Blob> = Arc::new(UnreliableBlob::new(blob, unreliable.clone()));
410 let blob = CachingBlob::new(blob);
418 config.critical_downgrade_interval = Duration::from_secs(0);
421 config.set_state_versions_recent_live_diffs_limit(5);
423 let consensus = match &args.consensus_uri {
424 Some(consensus_uri) => {
425 let cfg = ConsensusConfig::try_from(
426 consensus_uri,
427 Box::new(config.clone()),
428 metrics.postgres_consensus.clone(),
429 Arc::clone(&config.configs),
430 )
431 .expect("consensus_uri should be valid");
432 loop {
433 match cfg.clone().open().await {
434 Ok(x) => break x,
435 Err(err) => {
436 info!("failed to open consensus, trying again: {}", err);
437 }
438 }
439 }
440 }
441 None => MaelstromConsensus::new(handle.clone()),
442 };
443 let consensus: Arc<dyn Consensus> =
444 Arc::new(UnreliableConsensus::new(consensus, unreliable));
445
446 let isolated_runtime = Arc::new(IsolatedRuntime::new_for_tests());
448 let pubsub_sender = PubSubClientConnection::noop().sender;
449 let shared_states = Arc::new(StateCache::new(
450 &config,
451 Arc::clone(&metrics),
452 Arc::clone(&pubsub_sender),
453 ));
454 let client = PersistClient::new(
455 config,
456 blob,
457 consensus,
458 metrics,
459 isolated_runtime,
460 shared_states,
461 pubsub_sender,
462 )?;
463 let oracle_uri = args.consensus_uri.clone();
466 let oracle_scheme = oracle_uri.as_ref().map(|x| (x.scheme(), x));
467 let oracle: Box<dyn TimestampOracle<mz_repr::Timestamp> + Send> = match oracle_scheme {
468 Some(("postgres", uri)) | Some(("postgresql", uri)) => {
469 let cfg = PostgresTimestampOracleConfig::new(uri, &metrics_registry);
470 Box::new(
471 PostgresTimestampOracle::open(
472 cfg,
473 "maelstrom".to_owned(),
474 mz_repr::Timestamp::minimum(),
475 NOW_ZERO.clone(),
476 false, )
478 .await,
479 )
480 }
481 Some(("mem", _)) => Box::new(MemTimestampOracle::default()),
482 Some((scheme, _)) => unimplemented!("unsupported oracle type: {}", scheme),
483 None => unimplemented!("TODO: support maelstrom oracle"),
484 };
485 let transactor = Transactor::new(client, shard_id, oracle).await?;
486 let service = TransactorService(Arc::new(Mutex::new(transactor)));
487 Ok(service)
488 }
489
490 async fn eval(&self, handle: Handle, src: NodeId, req: Body) {
491 match req {
492 Body::ReqTxn { msg_id, txn } => {
493 let in_reply_to = msg_id;
494 match self.0.lock().await.transact(&txn).await {
495 Ok(txn) => handle.send_res(src, |msg_id| Body::ResTxn {
496 msg_id,
497 in_reply_to,
498 txn,
499 }),
500 Err(MaelstromError { code, text }) => {
501 handle.send_res(src, |msg_id| Body::Error {
502 msg_id: Some(msg_id),
503 in_reply_to,
504 code,
505 text,
506 })
507 }
508 }
509 }
510 req => unimplemented!("unsupported req: {:?}", req),
511 }
512 }
513}