1use std::collections::BTreeMap;
14use std::sync::Arc;
15use std::time::{Duration, SystemTime, UNIX_EPOCH};
16
17use async_trait::async_trait;
18use differential_dataflow::consolidation::consolidate_updates;
19use differential_dataflow::lattice::Lattice;
20use mz_dyncfg::ConfigUpdates;
21use mz_ore::metrics::MetricsRegistry;
22use mz_ore::now::SYSTEM_TIME;
23use mz_persist::cfg::{BlobConfig, ConsensusConfig};
24use mz_persist::location::{Blob, Consensus, ExternalError};
25use mz_persist::unreliable::{UnreliableBlob, UnreliableConsensus, UnreliableHandle};
26use mz_persist_client::async_runtime::IsolatedRuntime;
27use mz_persist_client::cache::StateCache;
28use mz_persist_client::cfg::PersistConfig;
29use mz_persist_client::critical::SinceHandle;
30use mz_persist_client::metrics::Metrics;
31use mz_persist_client::read::{Listen, ListenEvent};
32use mz_persist_client::rpc::PubSubClientConnection;
33use mz_persist_client::write::WriteHandle;
34use mz_persist_client::{Diagnostics, PersistClient, ShardId};
35use timely::PartialOrder;
36use timely::order::TotalOrder;
37use timely::progress::{Antichain, Timestamp};
38use tokio::sync::Mutex;
39use tracing::{debug, info, trace};
40
41use crate::maelstrom::Args;
42use crate::maelstrom::api::{Body, ErrorCode, MaelstromError, NodeId, ReqTxnOp, ResTxnOp};
43use crate::maelstrom::node::{Handle, Service};
44use crate::maelstrom::services::{CachingBlob, MaelstromBlob, MaelstromConsensus};
45use crate::maelstrom::txn_list_append_single::codec_impls::{
46 MaelstromKeySchema, MaelstromValSchema,
47};
48
49#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
51pub struct MaelstromKey(u64);
52
53#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
55pub struct MaelstromVal(Vec<u64>);
56
57#[derive(Debug)]
103pub struct Transactor {
104 cads_token: u64,
105 shard_id: ShardId,
106 client: PersistClient,
107 since: SinceHandle<MaelstromKey, MaelstromVal, u64, i64, u64>,
108 write: WriteHandle<MaelstromKey, MaelstromVal, u64, i64>,
109
110 read_ts: u64,
111
112 long_lived_updates: Vec<((MaelstromKey, MaelstromVal), u64, i64)>,
116 long_lived_listen: Listen<MaelstromKey, MaelstromVal, u64, i64>,
117}
118
119impl Transactor {
120 pub async fn new(
121 client: &PersistClient,
122 node_id: NodeId,
123 shard_id: ShardId,
124 ) -> Result<Self, MaelstromError> {
125 let cads_token = node_id
126 .0
127 .trim_start_matches('n')
128 .parse::<u64>()
129 .expect("maelstrom node_id should be n followed by an integer");
130
131 let (mut write, mut read) = client
132 .open(
133 shard_id,
134 Arc::new(MaelstromKeySchema),
135 Arc::new(MaelstromValSchema),
136 Diagnostics::from_purpose("maelstrom long-lived"),
137 true,
138 )
139 .await?;
140 let since = client
143 .open_critical_since(
144 shard_id,
145 PersistClient::CONTROLLER_CRITICAL_SINCE,
146 Diagnostics::from_purpose("maelstrom since"),
147 )
148 .await?;
149 let read_ts = Self::maybe_init_shard(&mut write).await?;
150
151 let mut long_lived_updates = Vec::new();
152 let as_of = since.since().clone();
153 let mut updates = read
154 .snapshot_and_fetch(as_of.clone())
155 .await
156 .expect("as_of unexpectedly unavailable");
157 long_lived_updates.append(&mut updates);
158 let long_lived_listen = read
159 .listen(as_of.clone())
160 .await
161 .expect("as_of unexpectedly unavailable");
162
163 Ok(Transactor {
164 client: client.clone(),
165 cads_token,
166 shard_id,
167 since,
168 write,
169 read_ts,
170 long_lived_updates,
171 long_lived_listen,
172 })
173 }
174
175 async fn maybe_init_shard(
178 write: &mut WriteHandle<MaelstromKey, MaelstromVal, u64, i64>,
179 ) -> Result<u64, MaelstromError> {
180 debug!("Transactor::maybe_init");
181 const EMPTY_UPDATES: &[((MaelstromKey, MaelstromVal), u64, i64)] = &[];
182 let ts_min = u64::minimum();
183 let initial_upper = Antichain::from_elem(ts_min);
184 let new_upper = Antichain::from_elem(ts_min + 1);
185 let cas_res = write
186 .compare_and_append(EMPTY_UPDATES, initial_upper.clone(), new_upper.clone())
187 .await;
188 let read_ts = match cas_res? {
189 Ok(()) => 0,
190 Err(mismatch) => Self::extract_ts(&mismatch.current)? - 1,
191 };
192 Ok(read_ts)
193 }
194
195 pub async fn transact(
196 &mut self,
197 req_ops: &[ReqTxnOp],
198 ) -> Result<Vec<ResTxnOp>, MaelstromError> {
199 loop {
200 trace!("transact req={:?}", req_ops);
201 let state = self.read().await?;
202 debug!("transact req={:?} state={:?}", req_ops, state);
203 let (writes, res_ops) = Self::eval_txn(&state, req_ops);
204
205 let write_ts = self.read_ts + 1;
208 let updates = writes
209 .into_iter()
210 .map(|(k, v, diff)| ((k, v), write_ts, diff));
211 let expected_upper = Antichain::from_elem(write_ts);
212 let new_upper = Antichain::from_elem(write_ts + 1);
213 let cas_res = self
214 .write
215 .compare_and_append(updates, expected_upper.clone(), new_upper)
216 .await?;
217 match cas_res {
218 Ok(()) => {
219 self.read_ts = write_ts;
220 self.advance_since().await?;
221 return Ok(res_ops);
222 }
223 Err(mismatch) => {
225 info!(
226 "transact lost the CaS race, retrying: {:?} vs {:?}",
227 mismatch.expected, mismatch.current
228 );
229 self.read_ts = Self::extract_ts(&mismatch.current)? - 1;
230 continue;
231 }
232 }
233 }
234 }
235
236 async fn read_short_lived(
237 &mut self,
238 ) -> Result<
239 (
240 Vec<((MaelstromKey, MaelstromVal), u64, i64)>,
241 Antichain<u64>,
242 ),
243 MaelstromError,
244 > {
245 loop {
246 let as_of = Antichain::from_elem(self.read_ts);
250 let since_ts = Self::extract_ts(self.since.since())?;
251 assert!(self.read_ts >= since_ts, "{} vs {}", self.read_ts, since_ts);
252 let snap_ts = since_ts + (self.read_ts - since_ts) / 2;
253 let snap_as_of = Antichain::from_elem(snap_ts);
254
255 let mut read = self
258 .client
259 .open_leased_reader(
260 self.shard_id,
261 Arc::new(MaelstromKeySchema),
262 Arc::new(MaelstromValSchema),
263 Diagnostics::from_purpose("maelstrom short-lived"),
264 true,
265 )
266 .await
267 .expect("codecs should match");
268
269 if !PartialOrder::less_equal(read.since(), &snap_as_of) {
270 info!(
273 "read handle since {:?} is not before as_of {}, fetching a new read_ts and trying again",
274 read.since().elements(),
275 snap_ts,
276 );
277 self.advance_since().await?;
278
279 let recent_upper = self.write.fetch_recent_upper().await;
280 self.read_ts = Self::extract_ts(recent_upper)? - 1;
281 continue;
282 }
283
284 let updates_res = read.snapshot_and_fetch(snap_as_of.clone()).await;
285 let mut updates = match updates_res {
286 Ok(x) => x,
287 Err(since) => {
288 info!(
300 "snapshot cannot serve requested as_of {} since is {:?}, fetching a new read_ts and trying again",
301 snap_ts,
302 since.0.as_option()
303 );
304 self.advance_since().await?;
305
306 let recent_upper = self.write.fetch_recent_upper().await;
307 self.read_ts = Self::extract_ts(recent_upper)? - 1;
308 continue;
309 }
310 };
311
312 let listen_res = read.listen(snap_as_of).await;
313 let listen = match listen_res {
314 Ok(x) => x,
315 Err(since) => {
316 info!(
328 "listen cannot serve requested as_of {} since is {:?}, fetching a new read_ts and trying again",
329 snap_ts,
330 since.0.as_option(),
331 );
332 self.advance_since().await?;
333 let recent_upper = self.write.fetch_recent_upper().await;
334 self.read_ts = Self::extract_ts(recent_upper)? - 1;
335 assert!(
336 PartialOrder::less_than(self.since.since(), recent_upper),
337 "invariant: since {:?} should be held behind the recent upper {:?}",
338 &**self.since.since(),
339 &**recent_upper
340 );
341 continue;
342 }
343 };
344
345 trace!(
346 "read updates from snapshot as_of {}: {:?}",
347 snap_ts, updates
348 );
349 let listen_updates = Self::listen_through(listen, &as_of).await?;
350 trace!(
351 "read updates from listener as_of {} through {}: {:?}",
352 snap_ts, self.read_ts, listen_updates
353 );
354 updates.extend(listen_updates);
355
356 for (_, t, _) in updates.iter_mut() {
358 t.advance_by(as_of.borrow());
359 }
360 consolidate_updates(&mut updates);
361 return Ok((updates, as_of));
362 }
363 }
364
365 async fn read(&mut self) -> Result<BTreeMap<MaelstromKey, MaelstromVal>, MaelstromError> {
366 let (updates, as_of) = self.read_short_lived().await?;
367
368 let long_lived = self.read_long_lived(&as_of).await;
369 assert_eq!(&updates, &long_lived);
370
371 Self::extract_state_map(self.read_ts, updates)
372 }
373
374 async fn listen_through(
375 mut listen: Listen<MaelstromKey, MaelstromVal, u64, i64>,
376 frontier: &Antichain<u64>,
377 ) -> Result<Vec<((MaelstromKey, MaelstromVal), u64, i64)>, ExternalError> {
378 let mut ret = Vec::new();
379 loop {
380 for event in listen.fetch_next().await {
381 match event {
382 ListenEvent::Progress(x) => {
383 if PartialOrder::less_than(frontier, &x) {
387 return Ok(ret);
388 }
389 }
390 ListenEvent::Updates(x) => {
391 ret.extend(x.into_iter().filter(|(_, ts, _)| !frontier.less_than(ts)));
394 }
395 }
396 }
397 }
398 }
399
400 async fn read_long_lived(
401 &mut self,
402 as_of: &Antichain<u64>,
403 ) -> Vec<((MaelstromKey, MaelstromVal), u64, i64)> {
404 while PartialOrder::less_equal(self.long_lived_listen.frontier(), as_of) {
405 for event in self.long_lived_listen.fetch_next().await {
406 match event {
407 ListenEvent::Updates(mut updates) => {
408 self.long_lived_updates.append(&mut updates)
409 }
410 ListenEvent::Progress(_) => {} }
412 }
413 }
414 for (_, t, _) in self.long_lived_updates.iter_mut() {
415 t.advance_by(as_of.borrow());
416 }
417 consolidate_updates(&mut self.long_lived_updates);
418
419 self.long_lived_updates
426 .iter()
427 .filter(|(_, t, _)| !as_of.less_than(t))
428 .cloned()
429 .collect()
430 }
431
432 fn extract_state_map(
433 read_ts: u64,
434 updates: Vec<((MaelstromKey, MaelstromVal), u64, i64)>,
435 ) -> Result<BTreeMap<MaelstromKey, MaelstromVal>, MaelstromError> {
436 let mut ret = BTreeMap::new();
437 for ((k, v), _, d) in updates {
438 if d != 1 {
439 return Err(MaelstromError {
440 code: ErrorCode::Crash,
441 text: format!("invalid read at time {}", read_ts),
442 });
443 }
444 if ret.contains_key(&k) {
445 return Err(MaelstromError {
446 code: ErrorCode::Crash,
447 text: format!("unexpected duplicate key {:?}", k),
448 });
449 }
450 ret.insert(k, v);
451 }
452 Ok(ret)
453 }
454
455 fn eval_txn(
456 state: &BTreeMap<MaelstromKey, MaelstromVal>,
457 req_ops: &[ReqTxnOp],
458 ) -> (Vec<(MaelstromKey, MaelstromVal, i64)>, Vec<ResTxnOp>) {
459 let mut res_ops = Vec::new();
460 let mut updates = Vec::new();
461 let mut txn_state = BTreeMap::new();
462
463 for req_op in req_ops.iter() {
464 match req_op {
465 ReqTxnOp::Read { key } => {
466 let current = txn_state
467 .get(&MaelstromKey(*key))
468 .or_else(|| state.get(&MaelstromKey(*key)));
469 let val = current.cloned().unwrap_or_default().0;
470 res_ops.push(ResTxnOp::Read { key: *key, val })
471 }
472 ReqTxnOp::Append { key, val } => {
473 let current = txn_state
474 .get(&MaelstromKey(*key))
475 .or_else(|| state.get(&MaelstromKey(*key)));
476 let mut vals = match current {
477 Some(val) => {
478 updates.push((MaelstromKey(*key), val.clone(), -1));
480 val.clone()
481 }
482 None => MaelstromVal::default(),
483 };
484 vals.0.push(*val);
485 txn_state.insert(MaelstromKey(*key), vals.clone());
486 updates.push((MaelstromKey(*key), vals, 1));
487 res_ops.push(ResTxnOp::Append {
488 key: key.clone(),
489 val: *val,
490 })
491 }
492 }
493 }
494
495 debug!(
496 "eval_txn\n req={:?}\n res={:?}\n updates={:?}\n state={:?}\n txn_state={:?}",
497 req_ops, res_ops, updates, state, txn_state
498 );
499 (updates, res_ops)
500 }
501
502 async fn advance_since(&mut self) -> Result<(), MaelstromError> {
503 const SINCE_LAG: u64 = 10;
505 let new_since = Antichain::from_elem(self.read_ts.saturating_sub(SINCE_LAG));
506
507 let mut expected_token = self.cads_token;
508 loop {
509 let res = self
510 .since
511 .maybe_compare_and_downgrade_since(&expected_token, (&self.cads_token, &new_since))
512 .await;
513 match res {
514 Some(Ok(latest_since)) => {
515 let since_ts = Self::extract_ts(&latest_since)?;
518 if since_ts > self.read_ts {
519 info!(
520 "since was last updated by {}, forwarding our read_ts from {} to {}",
521 expected_token, self.read_ts, since_ts
522 );
523 self.read_ts = since_ts;
524 }
525
526 return Ok(());
527 }
528 Some(Err(actual_token)) => {
529 debug!(
530 "actual downgrade_since token {} didn't match expected {}, retrying",
531 actual_token, expected_token,
532 );
533 expected_token = actual_token;
534 }
535 None => {
536 panic!("should not no-op `maybe_compare_and_downgrade_since` during testing");
537 }
538 }
539 }
540 }
541
542 fn extract_ts<T: TotalOrder + Copy>(frontier: &Antichain<T>) -> Result<T, MaelstromError> {
543 frontier.as_option().copied().ok_or_else(|| MaelstromError {
544 code: ErrorCode::Crash,
545 text: "shard unexpectedly closed".into(),
546 })
547 }
548}
549
550#[derive(Debug)]
552pub struct TransactorService(pub Arc<Mutex<Transactor>>);
553
554#[async_trait]
555impl Service for TransactorService {
556 async fn init(args: &Args, handle: &Handle) -> Result<Self, MaelstromError> {
557 let shard_id = handle.maybe_init_shard_id().await?;
561
562 let seed: u64 = SystemTime::now()
565 .duration_since(UNIX_EPOCH)
566 .unwrap_or_default()
567 .subsec_nanos()
568 .into();
569 let should_happen = 1.0 - args.unreliability;
572 let should_timeout = args.unreliability;
578 let unreliable = UnreliableHandle::new(seed, should_happen, should_timeout);
581
582 let mut config =
583 PersistConfig::new_default_configs(&mz_persist_client::BUILD_INFO, SYSTEM_TIME.clone());
584 {
585 let mut updates = ConfigUpdates::default();
588 updates.add(&mz_persist::postgres::USE_POSTGRES_TUNED_QUERIES, true);
589 config.apply_from(&updates);
590 }
591
592 let metrics = Arc::new(Metrics::new(&config, &MetricsRegistry::new()));
593
594 let blob = match &args.blob_uri {
596 Some(blob_uri) => {
597 let cfg = BlobConfig::try_from(
598 blob_uri,
599 Box::new(config.clone()),
600 metrics.s3_blob.clone(),
601 Arc::clone(&config.configs),
602 )
603 .await
604 .expect("blob_uri should be valid");
605 loop {
606 match cfg.clone().open().await {
607 Ok(x) => break x,
608 Err(err) => {
609 info!("failed to open blob, trying again: {}", err);
610 }
611 }
612 }
613 }
614 None => MaelstromBlob::new(handle.clone()),
615 };
616 let blob: Arc<dyn Blob> = Arc::new(UnreliableBlob::new(blob, unreliable.clone()));
617 let blob = CachingBlob::new(blob);
625 config.critical_downgrade_interval = Duration::from_secs(0);
628 config.set_state_versions_recent_live_diffs_limit(5);
630 let consensus = match &args.consensus_uri {
631 Some(consensus_uri) => {
632 let cfg = ConsensusConfig::try_from(
633 consensus_uri,
634 Box::new(config.clone()),
635 metrics.postgres_consensus.clone(),
636 Arc::clone(&config.configs),
637 )
638 .expect("consensus_uri should be valid");
639 loop {
640 match cfg.clone().open().await {
641 Ok(x) => break x,
642 Err(err) => {
643 info!("failed to open consensus, trying again: {}", err);
644 }
645 }
646 }
647 }
648 None => MaelstromConsensus::new(handle.clone()),
649 };
650 let consensus: Arc<dyn Consensus> =
651 Arc::new(UnreliableConsensus::new(consensus, unreliable));
652
653 let isolated_runtime = Arc::new(IsolatedRuntime::new_for_tests());
655 let pubsub_sender = PubSubClientConnection::noop().sender;
656 let shared_states = Arc::new(StateCache::new(
657 &config,
658 Arc::clone(&metrics),
659 Arc::clone(&pubsub_sender),
660 ));
661 let client = PersistClient::new(
662 config,
663 blob,
664 consensus,
665 metrics,
666 isolated_runtime,
667 shared_states,
668 pubsub_sender,
669 )?;
670 let transactor = Transactor::new(&client, handle.node_id(), shard_id).await?;
671 let service = TransactorService(Arc::new(Mutex::new(transactor)));
672 Ok(service)
673 }
674
675 async fn eval(&self, handle: Handle, src: NodeId, req: Body) {
676 match req {
677 Body::ReqTxn { msg_id, txn } => {
678 let in_reply_to = msg_id;
679 match self.0.lock().await.transact(&txn).await {
680 Ok(txn) => handle.send_res(src, |msg_id| Body::ResTxn {
681 msg_id,
682 in_reply_to,
683 txn,
684 }),
685 Err(MaelstromError { code, text }) => {
686 handle.send_res(src, |msg_id| Body::Error {
687 msg_id: Some(msg_id),
688 in_reply_to,
689 code,
690 text,
691 })
692 }
693 }
694 }
695 req => unimplemented!("unsupported req: {:?}", req),
696 }
697 }
698}
699
700mod codec_impls {
701 use arrow::array::{BinaryArray, BinaryBuilder, UInt64Array, UInt64Builder};
702 use arrow::datatypes::ToByteSlice;
703 use bytes::Bytes;
704 use mz_persist_types::Codec;
705 use mz_persist_types::codec_impls::{
706 SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
707 };
708 use mz_persist_types::columnar::Schema;
709 use mz_persist_types::stats::NoneStats;
710
711 use crate::maelstrom::txn_list_append_single::{MaelstromKey, MaelstromVal};
712
713 impl Codec for MaelstromKey {
714 type Storage = ();
715 type Schema = MaelstromKeySchema;
716
717 fn codec_name() -> String {
718 "MaelstromKey".into()
719 }
720
721 fn encode<B>(&self, buf: &mut B)
722 where
723 B: bytes::BufMut,
724 {
725 let bytes = serde_json::to_vec(&self.0).expect("failed to encode key");
726 buf.put(bytes.as_slice());
727 }
728
729 fn decode<'a>(buf: &'a [u8], _schema: &MaelstromKeySchema) -> Result<Self, String> {
730 Ok(MaelstromKey(
731 serde_json::from_slice(buf).map_err(|err| err.to_string())?,
732 ))
733 }
734
735 fn encode_schema(_schema: &Self::Schema) -> Bytes {
736 Bytes::new()
737 }
738
739 fn decode_schema(buf: &Bytes) -> Self::Schema {
740 assert_eq!(*buf, Bytes::new());
741 MaelstromKeySchema
742 }
743 }
744
745 impl SimpleColumnarData for MaelstromKey {
746 type ArrowBuilder = UInt64Builder;
747 type ArrowColumn = UInt64Array;
748
749 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
750 builder.values_slice().to_byte_slice().len()
751 }
752
753 fn push(&self, builder: &mut Self::ArrowBuilder) {
754 builder.append_value(self.0);
755 }
756 fn push_null(builder: &mut Self::ArrowBuilder) {
757 builder.append_null();
758 }
759
760 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
761 *self = MaelstromKey(column.value(idx));
762 }
763 }
764
765 #[derive(Debug, PartialEq)]
766 pub struct MaelstromKeySchema;
767
768 impl Schema<MaelstromKey> for MaelstromKeySchema {
769 type ArrowColumn = UInt64Array;
770 type Statistics = NoneStats;
771
772 type Decoder = SimpleColumnarDecoder<MaelstromKey>;
773 type Encoder = SimpleColumnarEncoder<MaelstromKey>;
774
775 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
776 Ok(SimpleColumnarEncoder::default())
777 }
778
779 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
780 Ok(SimpleColumnarDecoder::new(col))
781 }
782 }
783
784 impl Codec for MaelstromVal {
785 type Storage = ();
786 type Schema = MaelstromValSchema;
787
788 fn codec_name() -> String {
789 "MaelstromVal".into()
790 }
791
792 fn encode<B>(&self, buf: &mut B)
793 where
794 B: bytes::BufMut,
795 {
796 let bytes = serde_json::to_vec(&self.0).expect("failed to encode val");
797 buf.put(bytes.as_slice());
798 }
799
800 fn decode<'a>(buf: &'a [u8], _schema: &MaelstromValSchema) -> Result<Self, String> {
801 Ok(MaelstromVal(
802 serde_json::from_slice(buf).map_err(|err| err.to_string())?,
803 ))
804 }
805
806 fn encode_schema(_schema: &Self::Schema) -> Bytes {
807 Bytes::new()
808 }
809
810 fn decode_schema(buf: &Bytes) -> Self::Schema {
811 assert_eq!(*buf, Bytes::new());
812 MaelstromValSchema
813 }
814 }
815
816 impl SimpleColumnarData for MaelstromVal {
817 type ArrowBuilder = BinaryBuilder;
818 type ArrowColumn = BinaryArray;
819
820 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
821 builder.values_slice().to_byte_slice().len()
822 }
823
824 fn push(&self, builder: &mut Self::ArrowBuilder) {
825 builder.append_value(&self.encode_to_vec());
826 }
827 fn push_null(builder: &mut Self::ArrowBuilder) {
828 builder.append_null()
829 }
830
831 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
832 *self = MaelstromVal::decode(column.value(idx), &MaelstromValSchema)
833 .expect("should be valid MaelstromVal");
834 }
835 }
836
837 #[derive(Debug, PartialEq)]
838 pub struct MaelstromValSchema;
839
840 impl Schema<MaelstromVal> for MaelstromValSchema {
841 type ArrowColumn = BinaryArray;
842 type Statistics = NoneStats;
843
844 type Decoder = SimpleColumnarDecoder<MaelstromVal>;
845 type Encoder = SimpleColumnarEncoder<MaelstromVal>;
846
847 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
848 Ok(SimpleColumnarEncoder::default())
849 }
850
851 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
852 Ok(SimpleColumnarDecoder::new(col))
853 }
854 }
855}