1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Types for the persist-txn crate.

use std::fmt::Debug;

use serde::{Deserialize, Serialize};

use crate::stats::PartStats;
use crate::{Codec, Codec64, ShardId};

/// The in-mem representation of an update in the txns shard.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TxnsEntry {
    /// A data shard register operation.
    ///
    /// The `[u8; 8]` is a Codec64 encoded timestamp.
    Register(ShardId, [u8; 8]),
    /// A batch written to a data shard in a txn.
    ///
    /// The `[u8; 8]` is a Codec64 encoded timestamp.
    Append(ShardId, [u8; 8], Vec<u8>),
}

impl TxnsEntry {
    /// Returns the ShardId of the data shard targeted by this entry.
    pub fn data_id(&self) -> &ShardId {
        match self {
            TxnsEntry::Register(data_id, _) => data_id,
            TxnsEntry::Append(data_id, _, _) => data_id,
        }
    }

    /// Returns the decoded timestamp of this entry.
    pub fn ts<T: Codec64>(&self) -> T {
        match self {
            TxnsEntry::Register(_, ts) => T::decode(*ts),
            TxnsEntry::Append(_, ts, _) => T::decode(*ts),
        }
    }
}

/// An abstraction over the encoding format of [TxnsEntry].
///
/// This enables users of this crate to control how data is written to the txns
/// shard (which will allow mz to present it as a normal introspection source).
pub trait TxnsCodec: Debug {
    /// The `K` type used in the txns shard.
    type Key: Debug + Codec + Default;
    /// The `V` type used in the txns shard.
    type Val: Debug + Codec + Default;

    /// Returns the Schemas to use with [Self::Key] and [Self::Val].
    fn schemas() -> (<Self::Key as Codec>::Schema, <Self::Val as Codec>::Schema);
    /// Encodes a [TxnsEntry] in the format persisted in the txns shard.
    fn encode(e: TxnsEntry) -> (Self::Key, Self::Val);
    /// Decodes a [TxnsEntry] from the format persisted in the txns shard.
    ///
    /// Implementations should panic if the values are invalid.
    fn decode(key: Self::Key, val: Self::Val) -> TxnsEntry;

    /// Returns if a part might include the given data shard based on pushdown
    /// stats.
    ///
    /// False positives are okay (needless fetches) but false negatives are not
    /// (incorrectness). Returns an Option to make `?` convenient, `None` is
    /// treated the same as `Some(true)`.
    fn should_fetch_part(data_id: &ShardId, stats: &PartStats) -> Option<bool>;
}