1use std::collections::BTreeMap;
14use std::sync::Arc;
15use std::time::{Duration, SystemTime, UNIX_EPOCH};
16
17use async_trait::async_trait;
18use differential_dataflow::consolidation::consolidate_updates;
19use differential_dataflow::lattice::Lattice;
20use mz_dyncfg::ConfigUpdates;
21use mz_ore::metrics::MetricsRegistry;
22use mz_ore::now::SYSTEM_TIME;
23use mz_persist::cfg::{BlobConfig, ConsensusConfig};
24use mz_persist::location::{Blob, Consensus, ExternalError};
25use mz_persist::unreliable::{UnreliableBlob, UnreliableConsensus, UnreliableHandle};
26use mz_persist_client::async_runtime::IsolatedRuntime;
27use mz_persist_client::cache::StateCache;
28use mz_persist_client::cfg::PersistConfig;
29use mz_persist_client::critical::SinceHandle;
30use mz_persist_client::metrics::Metrics;
31use mz_persist_client::read::{Listen, ListenEvent};
32use mz_persist_client::rpc::PubSubClientConnection;
33use mz_persist_client::write::WriteHandle;
34use mz_persist_client::{Diagnostics, PersistClient, ShardId};
35use timely::PartialOrder;
36use timely::order::TotalOrder;
37use timely::progress::{Antichain, Timestamp};
38use tokio::sync::Mutex;
39use tracing::{debug, info, trace};
40
41use crate::maelstrom::Args;
42use crate::maelstrom::api::{Body, ErrorCode, MaelstromError, NodeId, ReqTxnOp, ResTxnOp};
43use crate::maelstrom::node::{Handle, Service};
44use crate::maelstrom::services::{CachingBlob, MaelstromBlob, MaelstromConsensus};
45use crate::maelstrom::txn_list_append_single::codec_impls::{
46 MaelstromKeySchema, MaelstromValSchema,
47};
48
49#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
51pub struct MaelstromKey(u64);
52
53#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
55pub struct MaelstromVal(Vec<u64>);
56
57#[derive(Debug)]
103pub struct Transactor {
104 cads_token: u64,
105 shard_id: ShardId,
106 client: PersistClient,
107 since: SinceHandle<MaelstromKey, MaelstromVal, u64, i64, u64>,
108 write: WriteHandle<MaelstromKey, MaelstromVal, u64, i64>,
109
110 read_ts: u64,
111
112 long_lived_updates: Vec<(
116 (Result<MaelstromKey, String>, Result<MaelstromVal, String>),
117 u64,
118 i64,
119 )>,
120 long_lived_listen: Listen<MaelstromKey, MaelstromVal, u64, i64>,
121}
122
123impl Transactor {
124 pub async fn new(
125 client: &PersistClient,
126 node_id: NodeId,
127 shard_id: ShardId,
128 ) -> Result<Self, MaelstromError> {
129 let cads_token = node_id
130 .0
131 .trim_start_matches('n')
132 .parse::<u64>()
133 .expect("maelstrom node_id should be n followed by an integer");
134
135 let (mut write, mut read) = client
136 .open(
137 shard_id,
138 Arc::new(MaelstromKeySchema),
139 Arc::new(MaelstromValSchema),
140 Diagnostics::from_purpose("maelstrom long-lived"),
141 true,
142 )
143 .await?;
144 let since = client
147 .open_critical_since(
148 shard_id,
149 PersistClient::CONTROLLER_CRITICAL_SINCE,
150 Diagnostics::from_purpose("maelstrom since"),
151 )
152 .await?;
153 let read_ts = Self::maybe_init_shard(&mut write).await?;
154
155 let mut long_lived_updates = Vec::new();
156 let as_of = since.since().clone();
157 let mut updates = read
158 .snapshot_and_fetch(as_of.clone())
159 .await
160 .expect("as_of unexpectedly unavailable");
161 long_lived_updates.append(&mut updates);
162 let long_lived_listen = read
163 .listen(as_of.clone())
164 .await
165 .expect("as_of unexpectedly unavailable");
166
167 Ok(Transactor {
168 client: client.clone(),
169 cads_token,
170 shard_id,
171 since,
172 write,
173 read_ts,
174 long_lived_updates,
175 long_lived_listen,
176 })
177 }
178
179 async fn maybe_init_shard(
182 write: &mut WriteHandle<MaelstromKey, MaelstromVal, u64, i64>,
183 ) -> Result<u64, MaelstromError> {
184 debug!("Transactor::maybe_init");
185 const EMPTY_UPDATES: &[((MaelstromKey, MaelstromVal), u64, i64)] = &[];
186 let ts_min = u64::minimum();
187 let initial_upper = Antichain::from_elem(ts_min);
188 let new_upper = Antichain::from_elem(ts_min + 1);
189 let cas_res = write
190 .compare_and_append(EMPTY_UPDATES, initial_upper.clone(), new_upper.clone())
191 .await;
192 let read_ts = match cas_res? {
193 Ok(()) => 0,
194 Err(mismatch) => Self::extract_ts(&mismatch.current)? - 1,
195 };
196 Ok(read_ts)
197 }
198
199 pub async fn transact(
200 &mut self,
201 req_ops: &[ReqTxnOp],
202 ) -> Result<Vec<ResTxnOp>, MaelstromError> {
203 loop {
204 trace!("transact req={:?}", req_ops);
205 let state = self.read().await?;
206 debug!("transact req={:?} state={:?}", req_ops, state);
207 let (writes, res_ops) = Self::eval_txn(&state, req_ops);
208
209 let write_ts = self.read_ts + 1;
212 let updates = writes
213 .into_iter()
214 .map(|(k, v, diff)| ((k, v), write_ts, diff));
215 let expected_upper = Antichain::from_elem(write_ts);
216 let new_upper = Antichain::from_elem(write_ts + 1);
217 let cas_res = self
218 .write
219 .compare_and_append(updates, expected_upper.clone(), new_upper)
220 .await?;
221 match cas_res {
222 Ok(()) => {
223 self.read_ts = write_ts;
224 self.advance_since().await?;
225 return Ok(res_ops);
226 }
227 Err(mismatch) => {
229 info!(
230 "transact lost the CaS race, retrying: {:?} vs {:?}",
231 mismatch.expected, mismatch.current
232 );
233 self.read_ts = Self::extract_ts(&mismatch.current)? - 1;
234 continue;
235 }
236 }
237 }
238 }
239
240 async fn read_short_lived(
241 &mut self,
242 ) -> Result<
243 (
244 Vec<(
245 (Result<MaelstromKey, String>, Result<MaelstromVal, String>),
246 u64,
247 i64,
248 )>,
249 Antichain<u64>,
250 ),
251 MaelstromError,
252 > {
253 loop {
254 let as_of = Antichain::from_elem(self.read_ts);
258 let since_ts = Self::extract_ts(self.since.since())?;
259 assert!(self.read_ts >= since_ts, "{} vs {}", self.read_ts, since_ts);
260 let snap_ts = since_ts + (self.read_ts - since_ts) / 2;
261 let snap_as_of = Antichain::from_elem(snap_ts);
262
263 let mut read = self
266 .client
267 .open_leased_reader(
268 self.shard_id,
269 Arc::new(MaelstromKeySchema),
270 Arc::new(MaelstromValSchema),
271 Diagnostics::from_purpose("maelstrom short-lived"),
272 true,
273 )
274 .await
275 .expect("codecs should match");
276
277 if !PartialOrder::less_equal(read.since(), &snap_as_of) {
278 info!(
281 "read handle since {:?} is not before as_of {}, fetching a new read_ts and trying again",
282 read.since().elements(),
283 snap_ts,
284 );
285 self.advance_since().await?;
286
287 let recent_upper = self.write.fetch_recent_upper().await;
288 self.read_ts = Self::extract_ts(recent_upper)? - 1;
289 continue;
290 }
291
292 let updates_res = read.snapshot_and_fetch(snap_as_of.clone()).await;
293 let mut updates = match updates_res {
294 Ok(x) => x,
295 Err(since) => {
296 info!(
308 "snapshot cannot serve requested as_of {} since is {:?}, fetching a new read_ts and trying again",
309 snap_ts,
310 since.0.as_option()
311 );
312 self.advance_since().await?;
313
314 let recent_upper = self.write.fetch_recent_upper().await;
315 self.read_ts = Self::extract_ts(recent_upper)? - 1;
316 continue;
317 }
318 };
319
320 let listen_res = read.listen(snap_as_of).await;
321 let listen = match listen_res {
322 Ok(x) => x,
323 Err(since) => {
324 info!(
336 "listen cannot serve requested as_of {} since is {:?}, fetching a new read_ts and trying again",
337 snap_ts,
338 since.0.as_option(),
339 );
340 self.advance_since().await?;
341 let recent_upper = self.write.fetch_recent_upper().await;
342 self.read_ts = Self::extract_ts(recent_upper)? - 1;
343 assert!(
344 PartialOrder::less_than(self.since.since(), recent_upper),
345 "invariant: since {:?} should be held behind the recent upper {:?}",
346 &**self.since.since(),
347 &**recent_upper
348 );
349 continue;
350 }
351 };
352
353 trace!(
354 "read updates from snapshot as_of {}: {:?}",
355 snap_ts, updates
356 );
357 let listen_updates = Self::listen_through(listen, &as_of).await?;
358 trace!(
359 "read updates from listener as_of {} through {}: {:?}",
360 snap_ts, self.read_ts, listen_updates
361 );
362 updates.extend(listen_updates);
363
364 for (_, t, _) in updates.iter_mut() {
366 t.advance_by(as_of.borrow());
367 }
368 consolidate_updates(&mut updates);
369 return Ok((updates, as_of));
370 }
371 }
372
373 async fn read(&mut self) -> Result<BTreeMap<MaelstromKey, MaelstromVal>, MaelstromError> {
374 let (updates, as_of) = self.read_short_lived().await?;
375
376 let long_lived = self.read_long_lived(&as_of).await;
377 assert_eq!(&updates, &long_lived);
378
379 Self::extract_state_map(self.read_ts, updates)
380 }
381
382 async fn listen_through(
383 mut listen: Listen<MaelstromKey, MaelstromVal, u64, i64>,
384 frontier: &Antichain<u64>,
385 ) -> Result<
386 Vec<(
387 (Result<MaelstromKey, String>, Result<MaelstromVal, String>),
388 u64,
389 i64,
390 )>,
391 ExternalError,
392 > {
393 let mut ret = Vec::new();
394 loop {
395 for event in listen.fetch_next().await {
396 match event {
397 ListenEvent::Progress(x) => {
398 if PartialOrder::less_than(frontier, &x) {
402 return Ok(ret);
403 }
404 }
405 ListenEvent::Updates(x) => {
406 ret.extend(x.into_iter().filter(|(_, ts, _)| !frontier.less_than(ts)));
409 }
410 }
411 }
412 }
413 }
414
415 async fn read_long_lived(
416 &mut self,
417 as_of: &Antichain<u64>,
418 ) -> Vec<(
419 (Result<MaelstromKey, String>, Result<MaelstromVal, String>),
420 u64,
421 i64,
422 )> {
423 while PartialOrder::less_equal(self.long_lived_listen.frontier(), as_of) {
424 for event in self.long_lived_listen.fetch_next().await {
425 match event {
426 ListenEvent::Updates(mut updates) => {
427 self.long_lived_updates.append(&mut updates)
428 }
429 ListenEvent::Progress(_) => {} }
431 }
432 }
433 for (_, t, _) in self.long_lived_updates.iter_mut() {
434 t.advance_by(as_of.borrow());
435 }
436 consolidate_updates(&mut self.long_lived_updates);
437
438 self.long_lived_updates
445 .iter()
446 .filter(|(_, t, _)| !as_of.less_than(t))
447 .cloned()
448 .collect()
449 }
450
451 fn extract_state_map(
452 read_ts: u64,
453 updates: Vec<(
454 (Result<MaelstromKey, String>, Result<MaelstromVal, String>),
455 u64,
456 i64,
457 )>,
458 ) -> Result<BTreeMap<MaelstromKey, MaelstromVal>, MaelstromError> {
459 let mut ret = BTreeMap::new();
460 for ((k, v), _, d) in updates {
461 if d != 1 {
462 return Err(MaelstromError {
463 code: ErrorCode::Crash,
464 text: format!("invalid read at time {}", read_ts),
465 });
466 }
467 let k = k.map_err(|err| MaelstromError {
468 code: ErrorCode::Crash,
469 text: format!("invalid key {}", err),
470 })?;
471 let v = v.map_err(|err| MaelstromError {
472 code: ErrorCode::Crash,
473 text: format!("invalid val {}", err),
474 })?;
475 if ret.contains_key(&k) {
476 return Err(MaelstromError {
477 code: ErrorCode::Crash,
478 text: format!("unexpected duplicate key {:?}", k),
479 });
480 }
481 ret.insert(k, v);
482 }
483 Ok(ret)
484 }
485
486 fn eval_txn(
487 state: &BTreeMap<MaelstromKey, MaelstromVal>,
488 req_ops: &[ReqTxnOp],
489 ) -> (Vec<(MaelstromKey, MaelstromVal, i64)>, Vec<ResTxnOp>) {
490 let mut res_ops = Vec::new();
491 let mut updates = Vec::new();
492 let mut txn_state = BTreeMap::new();
493
494 for req_op in req_ops.iter() {
495 match req_op {
496 ReqTxnOp::Read { key } => {
497 let current = txn_state
498 .get(&MaelstromKey(*key))
499 .or_else(|| state.get(&MaelstromKey(*key)));
500 let val = current.cloned().unwrap_or_default().0;
501 res_ops.push(ResTxnOp::Read { key: *key, val })
502 }
503 ReqTxnOp::Append { key, val } => {
504 let current = txn_state
505 .get(&MaelstromKey(*key))
506 .or_else(|| state.get(&MaelstromKey(*key)));
507 let mut vals = match current {
508 Some(val) => {
509 updates.push((MaelstromKey(*key), val.clone(), -1));
511 val.clone()
512 }
513 None => MaelstromVal::default(),
514 };
515 vals.0.push(*val);
516 txn_state.insert(MaelstromKey(*key), vals.clone());
517 updates.push((MaelstromKey(*key), vals, 1));
518 res_ops.push(ResTxnOp::Append {
519 key: key.clone(),
520 val: *val,
521 })
522 }
523 }
524 }
525
526 debug!(
527 "eval_txn\n req={:?}\n res={:?}\n updates={:?}\n state={:?}\n txn_state={:?}",
528 req_ops, res_ops, updates, state, txn_state
529 );
530 (updates, res_ops)
531 }
532
533 async fn advance_since(&mut self) -> Result<(), MaelstromError> {
534 const SINCE_LAG: u64 = 10;
536 let new_since = Antichain::from_elem(self.read_ts.saturating_sub(SINCE_LAG));
537
538 let mut expected_token = self.cads_token;
539 loop {
540 let res = self
541 .since
542 .maybe_compare_and_downgrade_since(&expected_token, (&self.cads_token, &new_since))
543 .await;
544 match res {
545 Some(Ok(latest_since)) => {
546 let since_ts = Self::extract_ts(&latest_since)?;
549 if since_ts > self.read_ts {
550 info!(
551 "since was last updated by {}, forwarding our read_ts from {} to {}",
552 expected_token, self.read_ts, since_ts
553 );
554 self.read_ts = since_ts;
555 }
556
557 return Ok(());
558 }
559 Some(Err(actual_token)) => {
560 debug!(
561 "actual downgrade_since token {} didn't match expected {}, retrying",
562 actual_token, expected_token,
563 );
564 expected_token = actual_token;
565 }
566 None => {
567 panic!("should not no-op `maybe_compare_and_downgrade_since` during testing");
568 }
569 }
570 }
571 }
572
573 fn extract_ts<T: TotalOrder + Copy>(frontier: &Antichain<T>) -> Result<T, MaelstromError> {
574 frontier.as_option().copied().ok_or_else(|| MaelstromError {
575 code: ErrorCode::Crash,
576 text: "shard unexpectedly closed".into(),
577 })
578 }
579}
580
581#[derive(Debug)]
583pub struct TransactorService(pub Arc<Mutex<Transactor>>);
584
585#[async_trait]
586impl Service for TransactorService {
587 async fn init(args: &Args, handle: &Handle) -> Result<Self, MaelstromError> {
588 let shard_id = handle.maybe_init_shard_id().await?;
592
593 let seed: u64 = SystemTime::now()
596 .duration_since(UNIX_EPOCH)
597 .unwrap_or_default()
598 .subsec_nanos()
599 .into();
600 let should_happen = 1.0 - args.unreliability;
603 let should_timeout = args.unreliability;
609 let unreliable = UnreliableHandle::new(seed, should_happen, should_timeout);
612
613 let mut config =
614 PersistConfig::new_default_configs(&mz_persist_client::BUILD_INFO, SYSTEM_TIME.clone());
615 {
616 let mut updates = ConfigUpdates::default();
619 updates.add(&mz_persist::postgres::USE_POSTGRES_TUNED_QUERIES, true);
620 config.apply_from(&updates);
621 }
622
623 let metrics = Arc::new(Metrics::new(&config, &MetricsRegistry::new()));
624
625 let blob = match &args.blob_uri {
627 Some(blob_uri) => {
628 let cfg = BlobConfig::try_from(
629 blob_uri,
630 Box::new(config.clone()),
631 metrics.s3_blob.clone(),
632 Arc::clone(&config.configs),
633 )
634 .await
635 .expect("blob_uri should be valid");
636 loop {
637 match cfg.clone().open().await {
638 Ok(x) => break x,
639 Err(err) => {
640 info!("failed to open blob, trying again: {}", err);
641 }
642 }
643 }
644 }
645 None => MaelstromBlob::new(handle.clone()),
646 };
647 let blob: Arc<dyn Blob> = Arc::new(UnreliableBlob::new(blob, unreliable.clone()));
648 let blob = CachingBlob::new(blob);
656 config.critical_downgrade_interval = Duration::from_secs(0);
659 config.set_state_versions_recent_live_diffs_limit(5);
661 let consensus = match &args.consensus_uri {
662 Some(consensus_uri) => {
663 let cfg = ConsensusConfig::try_from(
664 consensus_uri,
665 Box::new(config.clone()),
666 metrics.postgres_consensus.clone(),
667 Arc::clone(&config.configs),
668 )
669 .expect("consensus_uri should be valid");
670 loop {
671 match cfg.clone().open().await {
672 Ok(x) => break x,
673 Err(err) => {
674 info!("failed to open consensus, trying again: {}", err);
675 }
676 }
677 }
678 }
679 None => MaelstromConsensus::new(handle.clone()),
680 };
681 let consensus: Arc<dyn Consensus> =
682 Arc::new(UnreliableConsensus::new(consensus, unreliable));
683
684 let isolated_runtime = Arc::new(IsolatedRuntime::new_for_tests());
686 let pubsub_sender = PubSubClientConnection::noop().sender;
687 let shared_states = Arc::new(StateCache::new(
688 &config,
689 Arc::clone(&metrics),
690 Arc::clone(&pubsub_sender),
691 ));
692 let client = PersistClient::new(
693 config,
694 blob,
695 consensus,
696 metrics,
697 isolated_runtime,
698 shared_states,
699 pubsub_sender,
700 )?;
701 let transactor = Transactor::new(&client, handle.node_id(), shard_id).await?;
702 let service = TransactorService(Arc::new(Mutex::new(transactor)));
703 Ok(service)
704 }
705
706 async fn eval(&self, handle: Handle, src: NodeId, req: Body) {
707 match req {
708 Body::ReqTxn { msg_id, txn } => {
709 let in_reply_to = msg_id;
710 match self.0.lock().await.transact(&txn).await {
711 Ok(txn) => handle.send_res(src, |msg_id| Body::ResTxn {
712 msg_id,
713 in_reply_to,
714 txn,
715 }),
716 Err(MaelstromError { code, text }) => {
717 handle.send_res(src, |msg_id| Body::Error {
718 msg_id: Some(msg_id),
719 in_reply_to,
720 code,
721 text,
722 })
723 }
724 }
725 }
726 req => unimplemented!("unsupported req: {:?}", req),
727 }
728 }
729}
730
731mod codec_impls {
732 use arrow::array::{BinaryArray, BinaryBuilder, UInt64Array, UInt64Builder};
733 use arrow::datatypes::ToByteSlice;
734 use bytes::Bytes;
735 use mz_persist_types::Codec;
736 use mz_persist_types::codec_impls::{
737 SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
738 };
739 use mz_persist_types::columnar::Schema;
740 use mz_persist_types::stats::NoneStats;
741
742 use crate::maelstrom::txn_list_append_single::{MaelstromKey, MaelstromVal};
743
744 impl Codec for MaelstromKey {
745 type Storage = ();
746 type Schema = MaelstromKeySchema;
747
748 fn codec_name() -> String {
749 "MaelstromKey".into()
750 }
751
752 fn encode<B>(&self, buf: &mut B)
753 where
754 B: bytes::BufMut,
755 {
756 let bytes = serde_json::to_vec(&self.0).expect("failed to encode key");
757 buf.put(bytes.as_slice());
758 }
759
760 fn decode<'a>(buf: &'a [u8], _schema: &MaelstromKeySchema) -> Result<Self, String> {
761 Ok(MaelstromKey(
762 serde_json::from_slice(buf).map_err(|err| err.to_string())?,
763 ))
764 }
765
766 fn encode_schema(_schema: &Self::Schema) -> Bytes {
767 Bytes::new()
768 }
769
770 fn decode_schema(buf: &Bytes) -> Self::Schema {
771 assert_eq!(*buf, Bytes::new());
772 MaelstromKeySchema
773 }
774 }
775
776 impl SimpleColumnarData for MaelstromKey {
777 type ArrowBuilder = UInt64Builder;
778 type ArrowColumn = UInt64Array;
779
780 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
781 builder.values_slice().to_byte_slice().len()
782 }
783
784 fn push(&self, builder: &mut Self::ArrowBuilder) {
785 builder.append_value(self.0);
786 }
787 fn push_null(builder: &mut Self::ArrowBuilder) {
788 builder.append_null();
789 }
790
791 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
792 *self = MaelstromKey(column.value(idx));
793 }
794 }
795
796 #[derive(Debug, PartialEq)]
797 pub struct MaelstromKeySchema;
798
799 impl Schema<MaelstromKey> for MaelstromKeySchema {
800 type ArrowColumn = UInt64Array;
801 type Statistics = NoneStats;
802
803 type Decoder = SimpleColumnarDecoder<MaelstromKey>;
804 type Encoder = SimpleColumnarEncoder<MaelstromKey>;
805
806 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
807 Ok(SimpleColumnarEncoder::default())
808 }
809
810 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
811 Ok(SimpleColumnarDecoder::new(col))
812 }
813 }
814
815 impl Codec for MaelstromVal {
816 type Storage = ();
817 type Schema = MaelstromValSchema;
818
819 fn codec_name() -> String {
820 "MaelstromVal".into()
821 }
822
823 fn encode<B>(&self, buf: &mut B)
824 where
825 B: bytes::BufMut,
826 {
827 let bytes = serde_json::to_vec(&self.0).expect("failed to encode val");
828 buf.put(bytes.as_slice());
829 }
830
831 fn decode<'a>(buf: &'a [u8], _schema: &MaelstromValSchema) -> Result<Self, String> {
832 Ok(MaelstromVal(
833 serde_json::from_slice(buf).map_err(|err| err.to_string())?,
834 ))
835 }
836
837 fn encode_schema(_schema: &Self::Schema) -> Bytes {
838 Bytes::new()
839 }
840
841 fn decode_schema(buf: &Bytes) -> Self::Schema {
842 assert_eq!(*buf, Bytes::new());
843 MaelstromValSchema
844 }
845 }
846
847 impl SimpleColumnarData for MaelstromVal {
848 type ArrowBuilder = BinaryBuilder;
849 type ArrowColumn = BinaryArray;
850
851 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
852 builder.values_slice().to_byte_slice().len()
853 }
854
855 fn push(&self, builder: &mut Self::ArrowBuilder) {
856 builder.append_value(&self.encode_to_vec());
857 }
858 fn push_null(builder: &mut Self::ArrowBuilder) {
859 builder.append_null()
860 }
861
862 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
863 *self = MaelstromVal::decode(column.value(idx), &MaelstromValSchema)
864 .expect("should be valid MaelstromVal");
865 }
866 }
867
868 #[derive(Debug, PartialEq)]
869 pub struct MaelstromValSchema;
870
871 impl Schema<MaelstromVal> for MaelstromValSchema {
872 type ArrowColumn = BinaryArray;
873 type Statistics = NoneStats;
874
875 type Decoder = SimpleColumnarDecoder<MaelstromVal>;
876 type Encoder = SimpleColumnarEncoder<MaelstromVal>;
877
878 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
879 Ok(SimpleColumnarEncoder::default())
880 }
881
882 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
883 Ok(SimpleColumnarDecoder::new(col))
884 }
885 }
886}