tiberius/tds/codec/token/
token_env_change.rs
1use crate::{tds::Collation, Error, SqlReadBytes};
2use byteorder::{LittleEndian, ReadBytesExt};
3use fmt::Debug;
4use futures_util::io::AsyncReadExt;
5use std::{
6 convert::TryFrom,
7 fmt,
8 io::{Cursor, Read},
9};
10
11uint_enum! {
12 #[repr(u8)]
13 pub enum EnvChangeTy {
14 Database = 1,
15 Language = 2,
16 CharacterSet = 3,
17 PacketSize = 4,
18 UnicodeDataSortingLID = 5,
19 UnicodeDataSortingCFL = 6,
20 SqlCollation = 7,
21 BeginTransaction = 8,
23 CommitTransaction = 9,
24 RollbackTransaction = 10,
25 EnlistDTCTransaction = 11,
26 DefectTransaction = 12,
27 Rtls = 13,
28 PromoteTransaction = 15,
29 TransactionManagerAddress = 16,
30 TransactionEnded = 17,
31 ResetConnection = 18,
32 UserName = 19,
33 Routing = 20,
35 }
36}
37
38impl fmt::Display for EnvChangeTy {
39 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40 match self {
41 EnvChangeTy::Database => write!(f, "Database"),
42 EnvChangeTy::Language => write!(f, "Language"),
43 EnvChangeTy::CharacterSet => write!(f, "CharacterSet"),
44 EnvChangeTy::PacketSize => write!(f, "PacketSize"),
45 EnvChangeTy::UnicodeDataSortingLID => write!(f, "UnicodeDataSortingLID"),
46 EnvChangeTy::UnicodeDataSortingCFL => write!(f, "UnicodeDataSortingCFL"),
47 EnvChangeTy::SqlCollation => write!(f, "SqlCollation"),
48 EnvChangeTy::BeginTransaction => write!(f, "BeginTransaction"),
49 EnvChangeTy::CommitTransaction => write!(f, "CommitTransaction"),
50 EnvChangeTy::RollbackTransaction => write!(f, "RollbackTransaction"),
51 EnvChangeTy::EnlistDTCTransaction => write!(f, "EnlistDTCTransaction"),
52 EnvChangeTy::DefectTransaction => write!(f, "DefectTransaction"),
53 EnvChangeTy::Rtls => write!(f, "RTLS"),
54 EnvChangeTy::PromoteTransaction => write!(f, "PromoteTransaction"),
55 EnvChangeTy::TransactionManagerAddress => write!(f, "TransactionManagerAddress"),
56 EnvChangeTy::TransactionEnded => write!(f, "TransactionEnded"),
57 EnvChangeTy::ResetConnection => write!(f, "ResetConnection"),
58 EnvChangeTy::UserName => write!(f, "UserName"),
59 EnvChangeTy::Routing => write!(f, "Routing"),
60 }
61 }
62}
63
64#[derive(Debug)]
65pub enum TokenEnvChange {
66 Database(String, String),
67 PacketSize(u32, u32),
68 SqlCollation {
69 old: Option<Collation>,
70 new: Option<Collation>,
71 },
72 BeginTransaction([u8; 8]),
73 CommitTransaction,
74 RollbackTransaction,
75 DefectTransaction,
76 Routing {
77 host: String,
78 port: u16,
79 },
80 ChangeMirror(String),
81 Ignored(EnvChangeTy),
82}
83
84impl fmt::Display for TokenEnvChange {
85 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86 match self {
87 Self::Database(ref old, ref new) => {
88 write!(f, "Database change from '{}' to '{}'", old, new)
89 }
90 Self::PacketSize(old, new) => {
91 write!(f, "Packet size change from '{}' to '{}'", old, new)
92 }
93 Self::SqlCollation { old, new } => match (old, new) {
94 (Some(old), Some(new)) => write!(f, "SQL collation change from {} to {}", old, new),
95 (_, Some(new)) => write!(f, "SQL collation changed to {}", new),
96 (_, _) => write!(f, "SQL collation change"),
97 },
98 Self::BeginTransaction(_) => write!(f, "Begin transaction"),
99 Self::CommitTransaction => write!(f, "Commit transaction"),
100 Self::RollbackTransaction => write!(f, "Rollback transaction"),
101 Self::DefectTransaction => write!(f, "Defect transaction"),
102 Self::Routing { host, port } => write!(
103 f,
104 "Server requested routing to a new address: {}:{}",
105 host, port
106 ),
107 Self::ChangeMirror(ref mirror) => write!(f, "Fallback mirror server: `{}`", mirror),
108 Self::Ignored(ty) => write!(f, "Ignored env change: `{}`", ty),
109 }
110 }
111}
112
113impl TokenEnvChange {
114 pub(crate) async fn decode<R>(src: &mut R) -> crate::Result<Self>
115 where
116 R: SqlReadBytes + Unpin,
117 {
118 let len = src.read_u16_le().await? as usize;
119
120 let mut bytes = vec![0; len];
124 src.read_exact(&mut bytes[0..len]).await?;
125
126 let mut buf = Cursor::new(bytes);
127 let ty_byte = buf.read_u8()?;
128
129 let ty = EnvChangeTy::try_from(ty_byte)
130 .map_err(|_| Error::Protocol(format!("invalid envchange type {:x}", ty_byte).into()))?;
131
132 let token = match ty {
133 EnvChangeTy::Database => {
134 let len = buf.read_u8()? as usize;
135 let mut bytes = vec![0; len];
136
137 for item in bytes.iter_mut().take(len) {
138 *item = buf.read_u16::<LittleEndian>()?;
139 }
140
141 let new_value = String::from_utf16(&bytes[..])?;
142
143 let len = buf.read_u8()? as usize;
144 let mut bytes = vec![0; len];
145
146 for item in bytes.iter_mut().take(len) {
147 *item = buf.read_u16::<LittleEndian>()?;
148 }
149
150 let old_value = String::from_utf16(&bytes[..])?;
151
152 TokenEnvChange::Database(new_value, old_value)
153 }
154 EnvChangeTy::PacketSize => {
155 let len = buf.read_u8()? as usize;
156 let mut bytes = vec![0; len];
157
158 for item in bytes.iter_mut().take(len) {
159 *item = buf.read_u16::<LittleEndian>()?;
160 }
161
162 let new_value = String::from_utf16(&bytes[..])?;
163
164 let len = buf.read_u8()? as usize;
165 let mut bytes = vec![0; len];
166
167 for item in bytes.iter_mut().take(len) {
168 *item = buf.read_u16::<LittleEndian>()?;
169 }
170
171 let old_value = String::from_utf16(&bytes[..])?;
172
173 TokenEnvChange::PacketSize(new_value.parse()?, old_value.parse()?)
174 }
175 EnvChangeTy::SqlCollation => {
176 let len = buf.read_u8()? as usize;
177 let mut new_value = vec![0; len];
178 buf.read_exact(&mut new_value[0..len])?;
179
180 let new = if len == 5 {
181 let new_sortid = new_value[4];
182 let new_info = u32::from_le_bytes([
183 new_value[0],
184 new_value[1],
185 new_value[2],
186 new_value[3],
187 ]);
188
189 Some(Collation::new(new_info, new_sortid))
190 } else {
191 None
192 };
193
194 let len = buf.read_u8()? as usize;
195 let mut old_value = vec![0; len];
196 buf.read_exact(&mut old_value[0..len])?;
197
198 let old = if len == 5 {
199 let old_sortid = old_value[4];
200 let old_info = u32::from_le_bytes([
201 old_value[0],
202 old_value[1],
203 old_value[2],
204 old_value[3],
205 ]);
206
207 Some(Collation::new(old_info, old_sortid))
208 } else {
209 None
210 };
211
212 TokenEnvChange::SqlCollation { new, old }
213 }
214 EnvChangeTy::BeginTransaction | EnvChangeTy::EnlistDTCTransaction => {
215 let len = buf.read_u8()?;
216 assert!(len == 8);
217
218 let mut desc = [0; 8];
219 buf.read_exact(&mut desc)?;
220
221 TokenEnvChange::BeginTransaction(desc)
222 }
223
224 EnvChangeTy::CommitTransaction => TokenEnvChange::CommitTransaction,
225 EnvChangeTy::RollbackTransaction => TokenEnvChange::RollbackTransaction,
226 EnvChangeTy::DefectTransaction => TokenEnvChange::DefectTransaction,
227
228 EnvChangeTy::Routing => {
229 buf.read_u16::<LittleEndian>()?; buf.read_u8()?; let port = buf.read_u16::<LittleEndian>()?;
233
234 let len = buf.read_u16::<LittleEndian>()? as usize; let mut bytes = vec![0; len];
236
237 for item in bytes.iter_mut().take(len) {
238 *item = buf.read_u16::<LittleEndian>()?;
239 }
240
241 let host = String::from_utf16(&bytes[..])?;
242
243 TokenEnvChange::Routing { host, port }
244 }
245 EnvChangeTy::Rtls => {
246 let len = buf.read_u8()? as usize;
247 let mut bytes = vec![0; len];
248
249 for item in bytes.iter_mut().take(len) {
250 *item = buf.read_u16::<LittleEndian>()?;
251 }
252
253 let mirror_name = String::from_utf16(&bytes[..])?;
254
255 TokenEnvChange::ChangeMirror(mirror_name)
256 }
257 ty => TokenEnvChange::Ignored(ty),
258 };
259
260 Ok(token)
261 }
262}