1use std::collections::BTreeMap;
14use std::sync::Arc;
15use std::time::{Duration, SystemTime, UNIX_EPOCH};
16
17use async_trait::async_trait;
18use differential_dataflow::consolidation::consolidate_updates;
19use differential_dataflow::lattice::Lattice;
20use mz_ore::metrics::MetricsRegistry;
21use mz_ore::now::SYSTEM_TIME;
22use mz_persist::cfg::{BlobConfig, ConsensusConfig};
23use mz_persist::location::{Blob, Consensus, ExternalError};
24use mz_persist::unreliable::{UnreliableBlob, UnreliableConsensus, UnreliableHandle};
25use mz_persist_client::async_runtime::IsolatedRuntime;
26use mz_persist_client::cache::StateCache;
27use mz_persist_client::cfg::PersistConfig;
28use mz_persist_client::critical::SinceHandle;
29use mz_persist_client::metrics::Metrics;
30use mz_persist_client::read::{Listen, ListenEvent};
31use mz_persist_client::rpc::PubSubClientConnection;
32use mz_persist_client::write::WriteHandle;
33use mz_persist_client::{Diagnostics, PersistClient, ShardId};
34use timely::PartialOrder;
35use timely::order::TotalOrder;
36use timely::progress::{Antichain, Timestamp};
37use tokio::sync::Mutex;
38use tracing::{debug, info, trace};
39
40use crate::maelstrom::Args;
41use crate::maelstrom::api::{Body, ErrorCode, MaelstromError, NodeId, ReqTxnOp, ResTxnOp};
42use crate::maelstrom::node::{Handle, Service};
43use crate::maelstrom::services::{CachingBlob, MaelstromBlob, MaelstromConsensus};
44use crate::maelstrom::txn_list_append_single::codec_impls::{
45 MaelstromKeySchema, MaelstromValSchema,
46};
47
48#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
50pub struct MaelstromKey(u64);
51
52#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
54pub struct MaelstromVal(Vec<u64>);
55
56#[derive(Debug)]
102pub struct Transactor {
103 cads_token: u64,
104 shard_id: ShardId,
105 client: PersistClient,
106 since: SinceHandle<MaelstromKey, MaelstromVal, u64, i64, u64>,
107 write: WriteHandle<MaelstromKey, MaelstromVal, u64, i64>,
108
109 read_ts: u64,
110
111 long_lived_updates: Vec<(
115 (Result<MaelstromKey, String>, Result<MaelstromVal, String>),
116 u64,
117 i64,
118 )>,
119 long_lived_listen: Listen<MaelstromKey, MaelstromVal, u64, i64>,
120}
121
122impl Transactor {
123 pub async fn new(
124 client: &PersistClient,
125 node_id: NodeId,
126 shard_id: ShardId,
127 ) -> Result<Self, MaelstromError> {
128 let cads_token = node_id
129 .0
130 .trim_start_matches('n')
131 .parse::<u64>()
132 .expect("maelstrom node_id should be n followed by an integer");
133
134 let (mut write, mut read) = client
135 .open(
136 shard_id,
137 Arc::new(MaelstromKeySchema),
138 Arc::new(MaelstromValSchema),
139 Diagnostics::from_purpose("maelstrom long-lived"),
140 true,
141 )
142 .await?;
143 let since = client
146 .open_critical_since(
147 shard_id,
148 PersistClient::CONTROLLER_CRITICAL_SINCE,
149 Diagnostics::from_purpose("maelstrom since"),
150 )
151 .await?;
152 let read_ts = Self::maybe_init_shard(&mut write).await?;
153
154 let mut long_lived_updates = Vec::new();
155 let as_of = since.since().clone();
156 let mut updates = read
157 .snapshot_and_fetch(as_of.clone())
158 .await
159 .expect("as_of unexpectedly unavailable");
160 long_lived_updates.append(&mut updates);
161 let long_lived_listen = read
162 .listen(as_of.clone())
163 .await
164 .expect("as_of unexpectedly unavailable");
165
166 Ok(Transactor {
167 client: client.clone(),
168 cads_token,
169 shard_id,
170 since,
171 write,
172 read_ts,
173 long_lived_updates,
174 long_lived_listen,
175 })
176 }
177
178 async fn maybe_init_shard(
181 write: &mut WriteHandle<MaelstromKey, MaelstromVal, u64, i64>,
182 ) -> Result<u64, MaelstromError> {
183 debug!("Transactor::maybe_init");
184 const EMPTY_UPDATES: &[((MaelstromKey, MaelstromVal), u64, i64)] = &[];
185 let ts_min = u64::minimum();
186 let initial_upper = Antichain::from_elem(ts_min);
187 let new_upper = Antichain::from_elem(ts_min + 1);
188 let cas_res = write
189 .compare_and_append(EMPTY_UPDATES, initial_upper.clone(), new_upper.clone())
190 .await;
191 let read_ts = match cas_res? {
192 Ok(()) => 0,
193 Err(mismatch) => Self::extract_ts(&mismatch.current)? - 1,
194 };
195 Ok(read_ts)
196 }
197
198 pub async fn transact(
199 &mut self,
200 req_ops: &[ReqTxnOp],
201 ) -> Result<Vec<ResTxnOp>, MaelstromError> {
202 loop {
203 trace!("transact req={:?}", req_ops);
204 let state = self.read().await?;
205 debug!("transact req={:?} state={:?}", req_ops, state);
206 let (writes, res_ops) = Self::eval_txn(&state, req_ops);
207
208 let write_ts = self.read_ts + 1;
211 let updates = writes
212 .into_iter()
213 .map(|(k, v, diff)| ((k, v), write_ts, diff));
214 let expected_upper = Antichain::from_elem(write_ts);
215 let new_upper = Antichain::from_elem(write_ts + 1);
216 let cas_res = self
217 .write
218 .compare_and_append(updates, expected_upper.clone(), new_upper)
219 .await?;
220 match cas_res {
221 Ok(()) => {
222 self.read_ts = write_ts;
223 self.advance_since().await?;
224 return Ok(res_ops);
225 }
226 Err(mismatch) => {
228 info!(
229 "transact lost the CaS race, retrying: {:?} vs {:?}",
230 mismatch.expected, mismatch.current
231 );
232 self.read_ts = Self::extract_ts(&mismatch.current)? - 1;
233 continue;
234 }
235 }
236 }
237 }
238
239 async fn read_short_lived(
240 &mut self,
241 ) -> Result<
242 (
243 Vec<(
244 (Result<MaelstromKey, String>, Result<MaelstromVal, String>),
245 u64,
246 i64,
247 )>,
248 Antichain<u64>,
249 ),
250 MaelstromError,
251 > {
252 loop {
253 let as_of = Antichain::from_elem(self.read_ts);
257 let since_ts = Self::extract_ts(self.since.since())?;
258 assert!(self.read_ts >= since_ts, "{} vs {}", self.read_ts, since_ts);
259 let snap_ts = since_ts + (self.read_ts - since_ts) / 2;
260 let snap_as_of = Antichain::from_elem(snap_ts);
261
262 let mut read = self
265 .client
266 .open_leased_reader(
267 self.shard_id,
268 Arc::new(MaelstromKeySchema),
269 Arc::new(MaelstromValSchema),
270 Diagnostics::from_purpose("maelstrom short-lived"),
271 true,
272 )
273 .await
274 .expect("codecs should match");
275
276 let updates_res = read.snapshot_and_fetch(snap_as_of.clone()).await;
277 let mut updates = match updates_res {
278 Ok(x) => x,
279 Err(since) => {
280 info!(
292 "snapshot cannot serve requested as_of {} since is {:?}, fetching a new read_ts and trying again",
293 snap_ts,
294 since.0.as_option()
295 );
296 self.advance_since().await?;
297
298 let recent_upper = self.write.fetch_recent_upper().await;
299 self.read_ts = Self::extract_ts(recent_upper)? - 1;
300 continue;
301 }
302 };
303
304 let listen_res = read.listen(snap_as_of).await;
305 let listen = match listen_res {
306 Ok(x) => x,
307 Err(since) => {
308 info!(
320 "listen cannot serve requested as_of {} since is {:?}, fetching a new read_ts and trying again",
321 snap_ts,
322 since.0.as_option(),
323 );
324 self.advance_since().await?;
325 let recent_upper = self.write.fetch_recent_upper().await;
326 self.read_ts = Self::extract_ts(recent_upper)? - 1;
327 assert!(
328 PartialOrder::less_than(self.since.since(), recent_upper),
329 "invariant: since {:?} should be held behind the recent upper {:?}",
330 &**self.since.since(),
331 &**recent_upper
332 );
333 continue;
334 }
335 };
336
337 trace!(
338 "read updates from snapshot as_of {}: {:?}",
339 snap_ts, updates
340 );
341 let listen_updates = Self::listen_through(listen, &as_of).await?;
342 trace!(
343 "read updates from listener as_of {} through {}: {:?}",
344 snap_ts, self.read_ts, listen_updates
345 );
346 updates.extend(listen_updates);
347
348 for (_, t, _) in updates.iter_mut() {
350 t.advance_by(as_of.borrow());
351 }
352 consolidate_updates(&mut updates);
353 return Ok((updates, as_of));
354 }
355 }
356
357 async fn read(&mut self) -> Result<BTreeMap<MaelstromKey, MaelstromVal>, MaelstromError> {
358 let (updates, as_of) = self.read_short_lived().await?;
359
360 let long_lived = self.read_long_lived(&as_of).await;
361 assert_eq!(&updates, &long_lived);
362
363 Self::extract_state_map(self.read_ts, updates)
364 }
365
366 async fn listen_through(
367 mut listen: Listen<MaelstromKey, MaelstromVal, u64, i64>,
368 frontier: &Antichain<u64>,
369 ) -> Result<
370 Vec<(
371 (Result<MaelstromKey, String>, Result<MaelstromVal, String>),
372 u64,
373 i64,
374 )>,
375 ExternalError,
376 > {
377 let mut ret = Vec::new();
378 loop {
379 for event in listen.fetch_next().await {
380 match event {
381 ListenEvent::Progress(x) => {
382 if PartialOrder::less_than(frontier, &x) {
386 return Ok(ret);
387 }
388 }
389 ListenEvent::Updates(x) => {
390 ret.extend(x.into_iter().filter(|(_, ts, _)| !frontier.less_than(ts)));
393 }
394 }
395 }
396 }
397 }
398
399 async fn read_long_lived(
400 &mut self,
401 as_of: &Antichain<u64>,
402 ) -> Vec<(
403 (Result<MaelstromKey, String>, Result<MaelstromVal, String>),
404 u64,
405 i64,
406 )> {
407 while PartialOrder::less_equal(self.long_lived_listen.frontier(), as_of) {
408 for event in self.long_lived_listen.fetch_next().await {
409 match event {
410 ListenEvent::Updates(mut updates) => {
411 self.long_lived_updates.append(&mut updates)
412 }
413 ListenEvent::Progress(_) => {} }
415 }
416 }
417 for (_, t, _) in self.long_lived_updates.iter_mut() {
418 t.advance_by(as_of.borrow());
419 }
420 consolidate_updates(&mut self.long_lived_updates);
421
422 self.long_lived_updates
429 .iter()
430 .filter(|(_, t, _)| !as_of.less_than(t))
431 .cloned()
432 .collect()
433 }
434
435 fn extract_state_map(
436 read_ts: u64,
437 updates: Vec<(
438 (Result<MaelstromKey, String>, Result<MaelstromVal, String>),
439 u64,
440 i64,
441 )>,
442 ) -> Result<BTreeMap<MaelstromKey, MaelstromVal>, MaelstromError> {
443 let mut ret = BTreeMap::new();
444 for ((k, v), _, d) in updates {
445 if d != 1 {
446 return Err(MaelstromError {
447 code: ErrorCode::Crash,
448 text: format!("invalid read at time {}", read_ts),
449 });
450 }
451 let k = k.map_err(|err| MaelstromError {
452 code: ErrorCode::Crash,
453 text: format!("invalid key {}", err),
454 })?;
455 let v = v.map_err(|err| MaelstromError {
456 code: ErrorCode::Crash,
457 text: format!("invalid val {}", err),
458 })?;
459 if ret.contains_key(&k) {
460 return Err(MaelstromError {
461 code: ErrorCode::Crash,
462 text: format!("unexpected duplicate key {:?}", k),
463 });
464 }
465 ret.insert(k, v);
466 }
467 Ok(ret)
468 }
469
470 fn eval_txn(
471 state: &BTreeMap<MaelstromKey, MaelstromVal>,
472 req_ops: &[ReqTxnOp],
473 ) -> (Vec<(MaelstromKey, MaelstromVal, i64)>, Vec<ResTxnOp>) {
474 let mut res_ops = Vec::new();
475 let mut updates = Vec::new();
476 let mut txn_state = BTreeMap::new();
477
478 for req_op in req_ops.iter() {
479 match req_op {
480 ReqTxnOp::Read { key } => {
481 let current = txn_state
482 .get(&MaelstromKey(*key))
483 .or_else(|| state.get(&MaelstromKey(*key)));
484 let val = current.cloned().unwrap_or_default().0;
485 res_ops.push(ResTxnOp::Read { key: *key, val })
486 }
487 ReqTxnOp::Append { key, val } => {
488 let current = txn_state
489 .get(&MaelstromKey(*key))
490 .or_else(|| state.get(&MaelstromKey(*key)));
491 let mut vals = match current {
492 Some(val) => {
493 updates.push((MaelstromKey(*key), val.clone(), -1));
495 val.clone()
496 }
497 None => MaelstromVal::default(),
498 };
499 vals.0.push(*val);
500 txn_state.insert(MaelstromKey(*key), vals.clone());
501 updates.push((MaelstromKey(*key), vals, 1));
502 res_ops.push(ResTxnOp::Append {
503 key: key.clone(),
504 val: *val,
505 })
506 }
507 }
508 }
509
510 debug!(
511 "eval_txn\n req={:?}\n res={:?}\n updates={:?}\n state={:?}\n txn_state={:?}",
512 req_ops, res_ops, updates, state, txn_state
513 );
514 (updates, res_ops)
515 }
516
517 async fn advance_since(&mut self) -> Result<(), MaelstromError> {
518 const SINCE_LAG: u64 = 10;
520 let new_since = Antichain::from_elem(self.read_ts.saturating_sub(SINCE_LAG));
521
522 let mut expected_token = self.cads_token;
523 loop {
524 let res = self
525 .since
526 .maybe_compare_and_downgrade_since(&expected_token, (&self.cads_token, &new_since))
527 .await;
528 match res {
529 Some(Ok(latest_since)) => {
530 let since_ts = Self::extract_ts(&latest_since)?;
533 if since_ts > self.read_ts {
534 info!(
535 "since was last updated by {}, forwarding our read_ts from {} to {}",
536 expected_token, self.read_ts, since_ts
537 );
538 self.read_ts = since_ts;
539 }
540
541 return Ok(());
542 }
543 Some(Err(actual_token)) => {
544 debug!(
545 "actual downgrade_since token {} didn't match expected {}, retrying",
546 actual_token, expected_token,
547 );
548 expected_token = actual_token;
549 }
550 None => {
551 panic!("should not no-op `maybe_compare_and_downgrade_since` during testing");
552 }
553 }
554 }
555 }
556
557 fn extract_ts<T: TotalOrder + Copy>(frontier: &Antichain<T>) -> Result<T, MaelstromError> {
558 frontier.as_option().copied().ok_or_else(|| MaelstromError {
559 code: ErrorCode::Crash,
560 text: "shard unexpectedly closed".into(),
561 })
562 }
563}
564
565#[derive(Debug)]
567pub struct TransactorService(pub Arc<Mutex<Transactor>>);
568
569#[async_trait]
570impl Service for TransactorService {
571 async fn init(args: &Args, handle: &Handle) -> Result<Self, MaelstromError> {
572 let shard_id = handle.maybe_init_shard_id().await?;
576
577 let seed: u64 = SystemTime::now()
580 .duration_since(UNIX_EPOCH)
581 .unwrap_or_default()
582 .subsec_nanos()
583 .into();
584 let should_happen = 1.0 - args.unreliability;
587 let should_timeout = args.unreliability;
593 let unreliable = UnreliableHandle::new(seed, should_happen, should_timeout);
596
597 let mut config =
598 PersistConfig::new_default_configs(&mz_persist_client::BUILD_INFO, SYSTEM_TIME.clone());
599 let metrics = Arc::new(Metrics::new(&config, &MetricsRegistry::new()));
600
601 let blob = match &args.blob_uri {
603 Some(blob_uri) => {
604 let cfg = BlobConfig::try_from(
605 blob_uri,
606 Box::new(config.clone()),
607 metrics.s3_blob.clone(),
608 Arc::clone(&config.configs),
609 )
610 .await
611 .expect("blob_uri should be valid");
612 loop {
613 match cfg.clone().open().await {
614 Ok(x) => break x,
615 Err(err) => {
616 info!("failed to open blob, trying again: {}", err);
617 }
618 }
619 }
620 }
621 None => MaelstromBlob::new(handle.clone()),
622 };
623 let blob: Arc<dyn Blob> = Arc::new(UnreliableBlob::new(blob, unreliable.clone()));
624 let blob = CachingBlob::new(blob);
632 config.critical_downgrade_interval = Duration::from_secs(0);
635 config.set_state_versions_recent_live_diffs_limit(5);
637 let consensus = match &args.consensus_uri {
638 Some(consensus_uri) => {
639 let cfg = ConsensusConfig::try_from(
640 consensus_uri,
641 Box::new(config.clone()),
642 metrics.postgres_consensus.clone(),
643 )
644 .expect("consensus_uri should be valid");
645 loop {
646 match cfg.clone().open().await {
647 Ok(x) => break x,
648 Err(err) => {
649 info!("failed to open consensus, trying again: {}", err);
650 }
651 }
652 }
653 }
654 None => MaelstromConsensus::new(handle.clone()),
655 };
656 let consensus: Arc<dyn Consensus> =
657 Arc::new(UnreliableConsensus::new(consensus, unreliable));
658
659 let isolated_runtime = Arc::new(IsolatedRuntime::default());
661 let pubsub_sender = PubSubClientConnection::noop().sender;
662 let shared_states = Arc::new(StateCache::new(
663 &config,
664 Arc::clone(&metrics),
665 Arc::clone(&pubsub_sender),
666 ));
667 let client = PersistClient::new(
668 config,
669 blob,
670 consensus,
671 metrics,
672 isolated_runtime,
673 shared_states,
674 pubsub_sender,
675 )?;
676 let transactor = Transactor::new(&client, handle.node_id(), shard_id).await?;
677 let service = TransactorService(Arc::new(Mutex::new(transactor)));
678 Ok(service)
679 }
680
681 async fn eval(&self, handle: Handle, src: NodeId, req: Body) {
682 match req {
683 Body::ReqTxn { msg_id, txn } => {
684 let in_reply_to = msg_id;
685 match self.0.lock().await.transact(&txn).await {
686 Ok(txn) => handle.send_res(src, |msg_id| Body::ResTxn {
687 msg_id,
688 in_reply_to,
689 txn,
690 }),
691 Err(MaelstromError { code, text }) => {
692 handle.send_res(src, |msg_id| Body::Error {
693 msg_id: Some(msg_id),
694 in_reply_to,
695 code,
696 text,
697 })
698 }
699 }
700 }
701 req => unimplemented!("unsupported req: {:?}", req),
702 }
703 }
704}
705
706mod codec_impls {
707 use arrow::array::{BinaryArray, BinaryBuilder, UInt64Array, UInt64Builder};
708 use arrow::datatypes::ToByteSlice;
709 use bytes::Bytes;
710 use mz_persist_types::Codec;
711 use mz_persist_types::codec_impls::{
712 SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
713 };
714 use mz_persist_types::columnar::Schema;
715 use mz_persist_types::stats::NoneStats;
716
717 use crate::maelstrom::txn_list_append_single::{MaelstromKey, MaelstromVal};
718
719 impl Codec for MaelstromKey {
720 type Storage = ();
721 type Schema = MaelstromKeySchema;
722
723 fn codec_name() -> String {
724 "MaelstromKey".into()
725 }
726
727 fn encode<B>(&self, buf: &mut B)
728 where
729 B: bytes::BufMut,
730 {
731 let bytes = serde_json::to_vec(&self.0).expect("failed to encode key");
732 buf.put(bytes.as_slice());
733 }
734
735 fn decode<'a>(buf: &'a [u8], _schema: &MaelstromKeySchema) -> Result<Self, String> {
736 Ok(MaelstromKey(
737 serde_json::from_slice(buf).map_err(|err| err.to_string())?,
738 ))
739 }
740
741 fn encode_schema(_schema: &Self::Schema) -> Bytes {
742 Bytes::new()
743 }
744
745 fn decode_schema(buf: &Bytes) -> Self::Schema {
746 assert_eq!(*buf, Bytes::new());
747 MaelstromKeySchema
748 }
749 }
750
751 impl SimpleColumnarData for MaelstromKey {
752 type ArrowBuilder = UInt64Builder;
753 type ArrowColumn = UInt64Array;
754
755 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
756 builder.values_slice().to_byte_slice().len()
757 }
758
759 fn push(&self, builder: &mut Self::ArrowBuilder) {
760 builder.append_value(self.0);
761 }
762 fn push_null(builder: &mut Self::ArrowBuilder) {
763 builder.append_null();
764 }
765
766 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
767 *self = MaelstromKey(column.value(idx));
768 }
769 }
770
771 #[derive(Debug, PartialEq)]
772 pub struct MaelstromKeySchema;
773
774 impl Schema<MaelstromKey> for MaelstromKeySchema {
775 type ArrowColumn = UInt64Array;
776 type Statistics = NoneStats;
777
778 type Decoder = SimpleColumnarDecoder<MaelstromKey>;
779 type Encoder = SimpleColumnarEncoder<MaelstromKey>;
780
781 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
782 Ok(SimpleColumnarEncoder::default())
783 }
784
785 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
786 Ok(SimpleColumnarDecoder::new(col))
787 }
788 }
789
790 impl Codec for MaelstromVal {
791 type Storage = ();
792 type Schema = MaelstromValSchema;
793
794 fn codec_name() -> String {
795 "MaelstromVal".into()
796 }
797
798 fn encode<B>(&self, buf: &mut B)
799 where
800 B: bytes::BufMut,
801 {
802 let bytes = serde_json::to_vec(&self.0).expect("failed to encode val");
803 buf.put(bytes.as_slice());
804 }
805
806 fn decode<'a>(buf: &'a [u8], _schema: &MaelstromValSchema) -> Result<Self, String> {
807 Ok(MaelstromVal(
808 serde_json::from_slice(buf).map_err(|err| err.to_string())?,
809 ))
810 }
811
812 fn encode_schema(_schema: &Self::Schema) -> Bytes {
813 Bytes::new()
814 }
815
816 fn decode_schema(buf: &Bytes) -> Self::Schema {
817 assert_eq!(*buf, Bytes::new());
818 MaelstromValSchema
819 }
820 }
821
822 impl SimpleColumnarData for MaelstromVal {
823 type ArrowBuilder = BinaryBuilder;
824 type ArrowColumn = BinaryArray;
825
826 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
827 builder.values_slice().to_byte_slice().len()
828 }
829
830 fn push(&self, builder: &mut Self::ArrowBuilder) {
831 builder.append_value(&self.encode_to_vec());
832 }
833 fn push_null(builder: &mut Self::ArrowBuilder) {
834 builder.append_null()
835 }
836
837 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
838 *self = MaelstromVal::decode(column.value(idx), &MaelstromValSchema)
839 .expect("should be valid MaelstromVal");
840 }
841 }
842
843 #[derive(Debug, PartialEq)]
844 pub struct MaelstromValSchema;
845
846 impl Schema<MaelstromVal> for MaelstromValSchema {
847 type ArrowColumn = BinaryArray;
848 type Statistics = NoneStats;
849
850 type Decoder = SimpleColumnarDecoder<MaelstromVal>;
851 type Encoder = SimpleColumnarEncoder<MaelstromVal>;
852
853 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
854 Ok(SimpleColumnarEncoder::default())
855 }
856
857 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
858 Ok(SimpleColumnarDecoder::new(col))
859 }
860 }
861}