tiberius/tds/codec/token/
token_env_change.rs

1use crate::{tds::Collation, Error, SqlReadBytes};
2use byteorder::{LittleEndian, ReadBytesExt};
3use fmt::Debug;
4use futures_util::io::AsyncReadExt;
5use std::{
6    convert::TryFrom,
7    fmt,
8    io::{Cursor, Read},
9};
10
11uint_enum! {
12    #[repr(u8)]
13    pub enum EnvChangeTy {
14        Database = 1,
15        Language = 2,
16        CharacterSet = 3,
17        PacketSize = 4,
18        UnicodeDataSortingLID = 5,
19        UnicodeDataSortingCFL = 6,
20        SqlCollation = 7,
21        /// below here: >= TDSv7.2
22        BeginTransaction = 8,
23        CommitTransaction = 9,
24        RollbackTransaction = 10,
25        EnlistDTCTransaction = 11,
26        DefectTransaction = 12,
27        Rtls = 13,
28        PromoteTransaction = 15,
29        TransactionManagerAddress = 16,
30        TransactionEnded = 17,
31        ResetConnection = 18,
32        UserName = 19,
33        /// below here: TDS v7.4
34        Routing = 20,
35    }
36}
37
38impl fmt::Display for EnvChangeTy {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        match self {
41            EnvChangeTy::Database => write!(f, "Database"),
42            EnvChangeTy::Language => write!(f, "Language"),
43            EnvChangeTy::CharacterSet => write!(f, "CharacterSet"),
44            EnvChangeTy::PacketSize => write!(f, "PacketSize"),
45            EnvChangeTy::UnicodeDataSortingLID => write!(f, "UnicodeDataSortingLID"),
46            EnvChangeTy::UnicodeDataSortingCFL => write!(f, "UnicodeDataSortingCFL"),
47            EnvChangeTy::SqlCollation => write!(f, "SqlCollation"),
48            EnvChangeTy::BeginTransaction => write!(f, "BeginTransaction"),
49            EnvChangeTy::CommitTransaction => write!(f, "CommitTransaction"),
50            EnvChangeTy::RollbackTransaction => write!(f, "RollbackTransaction"),
51            EnvChangeTy::EnlistDTCTransaction => write!(f, "EnlistDTCTransaction"),
52            EnvChangeTy::DefectTransaction => write!(f, "DefectTransaction"),
53            EnvChangeTy::Rtls => write!(f, "RTLS"),
54            EnvChangeTy::PromoteTransaction => write!(f, "PromoteTransaction"),
55            EnvChangeTy::TransactionManagerAddress => write!(f, "TransactionManagerAddress"),
56            EnvChangeTy::TransactionEnded => write!(f, "TransactionEnded"),
57            EnvChangeTy::ResetConnection => write!(f, "ResetConnection"),
58            EnvChangeTy::UserName => write!(f, "UserName"),
59            EnvChangeTy::Routing => write!(f, "Routing"),
60        }
61    }
62}
63
64#[derive(Debug)]
65pub enum TokenEnvChange {
66    Database(String, String),
67    PacketSize(u32, u32),
68    SqlCollation {
69        old: Option<Collation>,
70        new: Option<Collation>,
71    },
72    BeginTransaction([u8; 8]),
73    CommitTransaction,
74    RollbackTransaction,
75    DefectTransaction,
76    Routing {
77        host: String,
78        port: u16,
79    },
80    ChangeMirror(String),
81    Ignored(EnvChangeTy),
82}
83
84impl fmt::Display for TokenEnvChange {
85    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86        match self {
87            Self::Database(ref old, ref new) => {
88                write!(f, "Database change from '{}' to '{}'", old, new)
89            }
90            Self::PacketSize(old, new) => {
91                write!(f, "Packet size change from '{}' to '{}'", old, new)
92            }
93            Self::SqlCollation { old, new } => match (old, new) {
94                (Some(old), Some(new)) => write!(f, "SQL collation change from {} to {}", old, new),
95                (_, Some(new)) => write!(f, "SQL collation changed to {}", new),
96                (_, _) => write!(f, "SQL collation change"),
97            },
98            Self::BeginTransaction(_) => write!(f, "Begin transaction"),
99            Self::CommitTransaction => write!(f, "Commit transaction"),
100            Self::RollbackTransaction => write!(f, "Rollback transaction"),
101            Self::DefectTransaction => write!(f, "Defect transaction"),
102            Self::Routing { host, port } => write!(
103                f,
104                "Server requested routing to a new address: {}:{}",
105                host, port
106            ),
107            Self::ChangeMirror(ref mirror) => write!(f, "Fallback mirror server: `{}`", mirror),
108            Self::Ignored(ty) => write!(f, "Ignored env change: `{}`", ty),
109        }
110    }
111}
112
113impl TokenEnvChange {
114    pub(crate) async fn decode<R>(src: &mut R) -> crate::Result<Self>
115    where
116        R: SqlReadBytes + Unpin,
117    {
118        let len = src.read_u16_le().await? as usize;
119
120        // We read all the bytes now, due to whatever environment change tokens
121        // we read, they might contain padding zeroes in the end we must
122        // discard.
123        let mut bytes = vec![0; len];
124        src.read_exact(&mut bytes[0..len]).await?;
125
126        let mut buf = Cursor::new(bytes);
127        let ty_byte = buf.read_u8()?;
128
129        let ty = EnvChangeTy::try_from(ty_byte)
130            .map_err(|_| Error::Protocol(format!("invalid envchange type {:x}", ty_byte).into()))?;
131
132        let token = match ty {
133            EnvChangeTy::Database => {
134                let len = buf.read_u8()? as usize;
135                let mut bytes = vec![0; len];
136
137                for item in bytes.iter_mut().take(len) {
138                    *item = buf.read_u16::<LittleEndian>()?;
139                }
140
141                let new_value = String::from_utf16(&bytes[..])?;
142
143                let len = buf.read_u8()? as usize;
144                let mut bytes = vec![0; len];
145
146                for item in bytes.iter_mut().take(len) {
147                    *item = buf.read_u16::<LittleEndian>()?;
148                }
149
150                let old_value = String::from_utf16(&bytes[..])?;
151
152                TokenEnvChange::Database(new_value, old_value)
153            }
154            EnvChangeTy::PacketSize => {
155                let len = buf.read_u8()? as usize;
156                let mut bytes = vec![0; len];
157
158                for item in bytes.iter_mut().take(len) {
159                    *item = buf.read_u16::<LittleEndian>()?;
160                }
161
162                let new_value = String::from_utf16(&bytes[..])?;
163
164                let len = buf.read_u8()? as usize;
165                let mut bytes = vec![0; len];
166
167                for item in bytes.iter_mut().take(len) {
168                    *item = buf.read_u16::<LittleEndian>()?;
169                }
170
171                let old_value = String::from_utf16(&bytes[..])?;
172
173                TokenEnvChange::PacketSize(new_value.parse()?, old_value.parse()?)
174            }
175            EnvChangeTy::SqlCollation => {
176                let len = buf.read_u8()? as usize;
177                let mut new_value = vec![0; len];
178                buf.read_exact(&mut new_value[0..len])?;
179
180                let new = if len == 5 {
181                    let new_sortid = new_value[4];
182                    let new_info = u32::from_le_bytes([
183                        new_value[0],
184                        new_value[1],
185                        new_value[2],
186                        new_value[3],
187                    ]);
188
189                    Some(Collation::new(new_info, new_sortid))
190                } else {
191                    None
192                };
193
194                let len = buf.read_u8()? as usize;
195                let mut old_value = vec![0; len];
196                buf.read_exact(&mut old_value[0..len])?;
197
198                let old = if len == 5 {
199                    let old_sortid = old_value[4];
200                    let old_info = u32::from_le_bytes([
201                        old_value[0],
202                        old_value[1],
203                        old_value[2],
204                        old_value[3],
205                    ]);
206
207                    Some(Collation::new(old_info, old_sortid))
208                } else {
209                    None
210                };
211
212                TokenEnvChange::SqlCollation { new, old }
213            }
214            EnvChangeTy::BeginTransaction | EnvChangeTy::EnlistDTCTransaction => {
215                let len = buf.read_u8()?;
216                assert!(len == 8);
217
218                let mut desc = [0; 8];
219                buf.read_exact(&mut desc)?;
220
221                TokenEnvChange::BeginTransaction(desc)
222            }
223
224            EnvChangeTy::CommitTransaction => TokenEnvChange::CommitTransaction,
225            EnvChangeTy::RollbackTransaction => TokenEnvChange::RollbackTransaction,
226            EnvChangeTy::DefectTransaction => TokenEnvChange::DefectTransaction,
227
228            EnvChangeTy::Routing => {
229                buf.read_u16::<LittleEndian>()?; // routing data value length
230                buf.read_u8()?; // routing protocol, always 0 (tcp)
231
232                let port = buf.read_u16::<LittleEndian>()?;
233
234                let len = buf.read_u16::<LittleEndian>()? as usize; // hostname string length
235                let mut bytes = vec![0; len];
236
237                for item in bytes.iter_mut().take(len) {
238                    *item = buf.read_u16::<LittleEndian>()?;
239                }
240
241                let host = String::from_utf16(&bytes[..])?;
242
243                TokenEnvChange::Routing { host, port }
244            }
245            EnvChangeTy::Rtls => {
246                let len = buf.read_u8()? as usize;
247                let mut bytes = vec![0; len];
248
249                for item in bytes.iter_mut().take(len) {
250                    *item = buf.read_u16::<LittleEndian>()?;
251                }
252
253                let mirror_name = String::from_utf16(&bytes[..])?;
254
255                TokenEnvChange::ChangeMirror(mirror_name)
256            }
257            ty => TokenEnvChange::Ignored(ty),
258        };
259
260        Ok(token)
261    }
262}