1use std::collections::btree_map::Entry;
14use std::collections::hash_map::DefaultHasher;
15use std::collections::{BTreeMap, BTreeSet};
16use std::hash::{Hash, Hasher};
17use std::sync::Arc;
18use std::time::{Duration, SystemTime, UNIX_EPOCH};
19
20use async_trait::async_trait;
21use differential_dataflow::consolidation::consolidate_updates;
22use mz_ore::metrics::MetricsRegistry;
23use mz_ore::now::{NOW_ZERO, SYSTEM_TIME};
24use mz_persist::cfg::{BlobConfig, ConsensusConfig};
25use mz_persist::location::{Blob, Consensus, ExternalError};
26use mz_persist::unreliable::{UnreliableBlob, UnreliableConsensus, UnreliableHandle};
27use mz_persist_client::async_runtime::IsolatedRuntime;
28use mz_persist_client::cache::StateCache;
29use mz_persist_client::cfg::PersistConfig;
30use mz_persist_client::metrics::Metrics as PersistMetrics;
31use mz_persist_client::read::ReadHandle;
32use mz_persist_client::rpc::PubSubClientConnection;
33use mz_persist_client::{Diagnostics, PersistClient, ShardId};
34use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
35use mz_timestamp_oracle::TimestampOracle;
36use mz_timestamp_oracle::postgres_oracle::{
37 PostgresTimestampOracle, PostgresTimestampOracleConfig,
38};
39use mz_txn_wal::metrics::Metrics as TxnMetrics;
40use mz_txn_wal::operator::DataSubscribeTask;
41use mz_txn_wal::txns::{Tidy, TxnsHandle};
42use timely::progress::Timestamp;
43use tokio::sync::Mutex;
44use tracing::{debug, info};
45
46use crate::maelstrom::Args;
47use crate::maelstrom::api::{Body, MaelstromError, NodeId, ReqTxnOp, ResTxnOp};
48use crate::maelstrom::node::{Handle, Service};
49use crate::maelstrom::services::{
50 CachingBlob, MaelstromBlob, MaelstromConsensus, MemTimestampOracle,
51};
52
53#[derive(Debug)]
54pub struct Transactor {
55 txns_id: ShardId,
56 oracle: Box<dyn TimestampOracle<mz_repr::Timestamp> + Send>,
57 client: PersistClient,
58 txns: TxnsHandle<String, (), u64, i64>,
59 tidy: Tidy,
60 data_reads: BTreeMap<ShardId, (u64, ReadHandle<String, (), u64, i64>)>,
61 peeks: BTreeMap<ShardId, DataSubscribeTask>,
62}
63
64impl Transactor {
65 pub async fn new(
66 client: PersistClient,
67 txns_id: ShardId,
68 oracle: Box<dyn TimestampOracle<mz_repr::Timestamp> + Send>,
69 ) -> Result<Self, MaelstromError> {
70 let init_ts = u64::from(oracle.write_ts().await.timestamp);
71 let txns = TxnsHandle::open(
72 init_ts,
73 client.clone(),
74 mz_txn_wal::all_dyncfgs(client.dyncfgs().clone()),
75 Arc::new(TxnMetrics::new(&MetricsRegistry::new())),
76 txns_id,
77 )
78 .await;
79 oracle.apply_write(init_ts.into()).await;
80 Ok(Transactor {
81 txns_id,
82 oracle,
83 txns,
84 tidy: Tidy::default(),
85 client,
86 data_reads: BTreeMap::default(),
87 peeks: BTreeMap::default(),
88 })
89 }
90
91 pub async fn transact(
92 &mut self,
93 req_ops: &[ReqTxnOp],
94 ) -> Result<Vec<ResTxnOp>, MaelstromError> {
95 let mut read_ids = Vec::new();
96 let mut writes = BTreeMap::<ShardId, Vec<(String, i64)>>::new();
97 for op in req_ops {
98 match op {
99 ReqTxnOp::Read { key } => {
100 read_ids.push(self.key_shard(*key));
101 }
102 ReqTxnOp::Append { key, val } => writes
103 .entry(self.key_shard(*key))
104 .or_default()
105 .push((val.to_string(), 1)),
106 }
107 }
108
109 for data_id in writes.keys().chain(read_ids.iter()) {
111 let _init_ts = self.ensure_registered(data_id).await;
112 }
113
114 let mut read_ts = u64::from(self.oracle.read_ts().await);
116 info!("read ts {}", read_ts);
117 self.peeks.clear();
118 self.read_at(read_ts, read_ids.iter()).await;
119 if writes.is_empty() {
120 debug!("req committed at read_ts={}", read_ts);
121 } else {
122 let mut txn = self.txns.begin();
123 for (data_id, writes) in writes {
124 for (data, diff) in writes {
125 txn.write(&data_id, data, (), diff).await;
126 }
127 }
128 let mut write_ts = u64::from(self.oracle.write_ts().await.timestamp);
129 loop {
130 let new_read_ts = write_ts.checked_sub(1).expect("write_ts should be > 0");
136 info!("read ts {} write ts {}", new_read_ts, write_ts);
137 if new_read_ts != read_ts {
138 self.unblock_and_read_at(new_read_ts, read_ids.iter()).await;
139 read_ts = new_read_ts;
140 }
141
142 txn.tidy(std::mem::take(&mut self.tidy));
143 match txn.commit_at(&mut self.txns, write_ts).await {
144 Ok(maintenance) => {
145 self.oracle.apply_write(write_ts.into()).await;
146 self.txns.compact_to(write_ts).await;
150
151 debug!("req committed at read_ts={} write_ts={}", read_ts, write_ts);
152 let tidy = maintenance.apply(&mut self.txns).await;
153 self.tidy.merge(tidy);
154 break;
155 }
156 Err(current) => {
157 write_ts = current;
158 continue;
161 }
162 }
163 }
164 }
165
166 let mut this_txn_writes = BTreeMap::<_, Vec<_>>::new();
171
172 let res = req_ops
173 .iter()
174 .map(|op| match op {
175 ReqTxnOp::Read { key } => {
176 let key_shard = self.key_shard(*key);
177 let mut data = self
178 .peeks
179 .get(&key_shard)
180 .expect("key should have been read")
181 .output()
182 .iter()
183 .filter(|(_, t, _)| *t <= read_ts)
187 .map(|(k, t, d)| {
188 let k = k.parse().expect("valid u64");
189 (k, *t, *d)
190 })
191 .collect::<Vec<_>>();
192 let mut seen = BTreeSet::new();
193 let mut val = Vec::new();
194 consolidate_updates(&mut data);
195 data.sort_by_key(|(k, t, d)| (*t, *k, std::cmp::Reverse(*d)));
199 debug!(
200 "{} {:.9} read after sort {:?}",
201 key,
202 key_shard.to_string(),
203 data
204 );
205 for (x, _, d) in data {
206 if d == 1 {
207 assert!(seen.insert(x));
208 val.push(x);
209 } else if d == -1 {
210 assert!(seen.remove(&x));
211 val.retain(|y| *y != x);
212 } else {
213 panic!("unexpected diff: {}", d);
214 }
215 }
216 if let Some(this_writes) = this_txn_writes.get(key) {
217 val.extend(this_writes.iter().copied());
218 }
219 ResTxnOp::Read { key: *key, val }
220 }
221 ReqTxnOp::Append { key, val } => {
222 this_txn_writes.entry(key).or_default().push(val);
223 ResTxnOp::Append {
224 key: *key,
225 val: *val,
226 }
227 }
228 })
229 .collect();
230 Ok(res)
231 }
232
233 async fn ensure_registered(&mut self, data_id: &ShardId) -> Result<u64, ExternalError> {
235 if let Some((init_ts, _)) = self.data_reads.get(data_id) {
237 return Ok(*init_ts);
238 }
239
240 let data_read = self
242 .client
243 .open_leased_reader(
244 *data_id,
245 Arc::new(StringSchema),
246 Arc::new(UnitSchema),
247 Diagnostics::from_purpose("txn data"),
248 true,
249 )
250 .await
251 .expect("data schema shouldn't change");
252
253 let mut init_ts = u64::from(self.oracle.write_ts().await.timestamp);
254 loop {
255 let data_write = self
256 .client
257 .open_writer(
258 *data_id,
259 Arc::new(StringSchema),
260 Arc::new(UnitSchema),
261 Diagnostics::from_purpose("txn data"),
262 )
263 .await
264 .expect("data schema shouldn't change");
265 let res = self.txns.register(init_ts, [data_write]).await;
266 match res {
267 Ok(_) => {
268 self.oracle.apply_write(init_ts.into()).await;
269 self.data_reads.insert(*data_id, (init_ts, data_read));
270 return Ok(init_ts);
271 }
272 Err(new_init_ts) => {
273 debug!(
274 "register {:.9} at {} mismatch current={}",
275 data_id, init_ts, new_init_ts
276 );
277 init_ts = u64::from(self.oracle.write_ts().await.timestamp);
278 continue;
279 }
280 }
281 }
282 }
283
284 async fn read_at(&mut self, read_ts: u64, data_ids: impl Iterator<Item = &ShardId>) {
285 let tidy = self.txns.apply_le(&read_ts).await;
287 self.tidy.merge(tidy);
288
289 for data_id in data_ids {
297 let peek = match self.peeks.entry(*data_id) {
298 Entry::Occupied(x) => x.into_mut(),
299 Entry::Vacant(x) => {
300 let peek =
301 DataSubscribeTask::new(self.client.clone(), self.txns_id, *data_id, 0)
302 .await;
303 x.insert(peek)
304 }
305 };
306 peek.step_past(read_ts).await;
307 }
308 }
309
310 async fn unblock_and_read_at(
311 &mut self,
312 read_ts: u64,
313 data_ids: impl Iterator<Item = &ShardId>,
314 ) {
315 debug!("unblock_and_read_at {}", read_ts);
316 let mut txn = self.txns.begin();
317 match txn.commit_at(&mut self.txns, read_ts).await {
318 Ok(apply) => {
319 self.tidy.merge(apply.apply(&mut self.txns).await);
320 }
321 Err(_) => {}
323 }
324 self.read_at(read_ts, data_ids).await
325 }
326
327 fn key_shard(&self, key: u64) -> ShardId {
331 let mut h = DefaultHasher::new();
332 key.hash(&mut h);
333 self.txns_id.hash(&mut h);
334 let mut buf = [0u8; 16];
335 buf[0..8].copy_from_slice(&h.finish().to_le_bytes());
336 let shard_id = format!("s{}", uuid::Uuid::from_bytes(buf));
337 shard_id.parse().expect("valid shard id")
338 }
339}
340
341#[derive(Debug)]
343pub struct TransactorService(pub Arc<Mutex<Transactor>>);
344
345#[async_trait]
346impl Service for TransactorService {
347 async fn init(args: &Args, handle: &Handle) -> Result<Self, MaelstromError> {
348 let shard_id = handle.maybe_init_shard_id().await?;
352
353 let seed: u64 = SystemTime::now()
356 .duration_since(UNIX_EPOCH)
357 .unwrap_or_default()
358 .subsec_nanos()
359 .into();
360 let should_happen = 1.0 - args.unreliability;
363 let should_timeout = args.unreliability;
369 let unreliable = UnreliableHandle::new(seed, should_happen, should_timeout);
372
373 let mut config =
374 PersistConfig::new_default_configs(&mz_persist_client::BUILD_INFO, SYSTEM_TIME.clone());
375 let metrics_registry = MetricsRegistry::new();
376 let metrics = Arc::new(PersistMetrics::new(&config, &metrics_registry));
377
378 let blob = match &args.blob_uri {
380 Some(blob_uri) => {
381 let cfg = BlobConfig::try_from(
382 blob_uri,
383 Box::new(config.clone()),
384 metrics.s3_blob.clone(),
385 Arc::clone(&config.configs),
386 )
387 .await
388 .expect("blob_uri should be valid");
389 loop {
390 match cfg.clone().open().await {
391 Ok(x) => break x,
392 Err(err) => {
393 info!("failed to open blob, trying again: {}", err);
394 }
395 }
396 }
397 }
398 None => MaelstromBlob::new(handle.clone()),
399 };
400 let blob: Arc<dyn Blob> = Arc::new(UnreliableBlob::new(blob, unreliable.clone()));
401 let blob = CachingBlob::new(blob);
409 config.critical_downgrade_interval = Duration::from_secs(0);
412 config.set_state_versions_recent_live_diffs_limit(5);
414 let consensus = match &args.consensus_uri {
415 Some(consensus_uri) => {
416 let cfg = ConsensusConfig::try_from(
417 consensus_uri,
418 Box::new(config.clone()),
419 metrics.postgres_consensus.clone(),
420 )
421 .expect("consensus_uri should be valid");
422 loop {
423 match cfg.clone().open().await {
424 Ok(x) => break x,
425 Err(err) => {
426 info!("failed to open consensus, trying again: {}", err);
427 }
428 }
429 }
430 }
431 None => MaelstromConsensus::new(handle.clone()),
432 };
433 let consensus: Arc<dyn Consensus> =
434 Arc::new(UnreliableConsensus::new(consensus, unreliable));
435
436 let isolated_runtime = Arc::new(IsolatedRuntime::default());
438 let pubsub_sender = PubSubClientConnection::noop().sender;
439 let shared_states = Arc::new(StateCache::new(
440 &config,
441 Arc::clone(&metrics),
442 Arc::clone(&pubsub_sender),
443 ));
444 let client = PersistClient::new(
445 config,
446 blob,
447 consensus,
448 metrics,
449 isolated_runtime,
450 shared_states,
451 pubsub_sender,
452 )?;
453 let oracle_uri = args.consensus_uri.clone();
456 let oracle_scheme = oracle_uri.as_ref().map(|x| (x.scheme(), x));
457 let oracle: Box<dyn TimestampOracle<mz_repr::Timestamp> + Send> = match oracle_scheme {
458 Some(("postgres", uri)) | Some(("postgresql", uri)) => {
459 let cfg = PostgresTimestampOracleConfig::new(uri, &metrics_registry);
460 Box::new(
461 PostgresTimestampOracle::open(
462 cfg,
463 "maelstrom".to_owned(),
464 mz_repr::Timestamp::minimum(),
465 NOW_ZERO.clone(),
466 false, )
468 .await,
469 )
470 }
471 Some(("mem", _)) => Box::new(MemTimestampOracle::default()),
472 Some((scheme, _)) => unimplemented!("unsupported oracle type: {}", scheme),
473 None => unimplemented!("TODO: support maelstrom oracle"),
474 };
475 let transactor = Transactor::new(client, shard_id, oracle).await?;
476 let service = TransactorService(Arc::new(Mutex::new(transactor)));
477 Ok(service)
478 }
479
480 async fn eval(&self, handle: Handle, src: NodeId, req: Body) {
481 match req {
482 Body::ReqTxn { msg_id, txn } => {
483 let in_reply_to = msg_id;
484 match self.0.lock().await.transact(&txn).await {
485 Ok(txn) => handle.send_res(src, |msg_id| Body::ResTxn {
486 msg_id,
487 in_reply_to,
488 txn,
489 }),
490 Err(MaelstromError { code, text }) => {
491 handle.send_res(src, |msg_id| Body::Error {
492 msg_id: Some(msg_id),
493 in_reply_to,
494 code,
495 text,
496 })
497 }
498 }
499 }
500 req => unimplemented!("unsupported req: {:?}", req),
501 }
502 }
503}