mysql_async/conn/binlog_stream/
mod.rs
1use futures_core::ready;
10use mysql_common::{
11 binlog::{
12 consts::{BinlogVersion::Version4, EventType},
13 events::{Event, TableMapEvent, TransactionPayloadEvent},
14 EventStreamReader,
15 },
16 io::ParseBuf,
17 packets::{ComRegisterSlave, ErrPacket, NetworkStreamTerminator, OkPacketDeserializer},
18};
19
20use std::{
21 future::Future,
22 io::{Cursor, ErrorKind},
23 pin::Pin,
24 task::{Context, Poll},
25};
26
27use crate::{connection_like::Connection, queryable::Queryable};
28use crate::{error::DriverError, io::ReadPacket, Conn, Error, IoError, Result};
29
30use self::request::BinlogStreamRequest;
31
32pub mod request;
33
34impl super::Conn {
35 pub async fn get_binlog_stream(
40 mut self,
41 request: BinlogStreamRequest<'_>,
42 ) -> Result<BinlogStream> {
43 self.request_binlog(request).await?;
44
45 Ok(BinlogStream::new(self))
46 }
47
48 async fn register_as_slave(
49 &mut self,
50 com_register_slave: ComRegisterSlave<'_>,
51 ) -> crate::Result<()> {
52 self.query_drop("SET @master_binlog_checksum='ALL'").await?;
53 self.write_command(&com_register_slave).await?;
54
55 self.read_packet().await?;
57
58 Ok(())
59 }
60
61 async fn request_binlog(&mut self, request: BinlogStreamRequest<'_>) -> crate::Result<()> {
62 self.register_as_slave(request.register_slave).await?;
63 self.write_command(&request.binlog_request.as_cmd()).await?;
64 Ok(())
65 }
66}
67
68pub struct BinlogStream {
72 read_packet: ReadPacket<'static, 'static>,
73 esr: EventStreamReader,
74 tpe: Option<Cursor<Vec<u8>>>,
77}
78
79impl BinlogStream {
80 pub(super) fn new(conn: Conn) -> Self {
82 BinlogStream {
83 read_packet: ReadPacket::new(conn),
84 esr: EventStreamReader::new(Version4),
85 tpe: None,
86 }
87 }
88
89 pub fn get_tme(&self, table_id: u64) -> Option<&TableMapEvent<'static>> {
91 self.esr.get_tme(table_id)
92 }
93
94 pub async fn close(self) -> Result<()> {
97 match self.read_packet.0 {
98 Connection::Conn(conn) => {
101 if let Err(Error::Io(IoError::Io(ref error))) = conn.close_conn().await {
102 if error.kind() == ErrorKind::BrokenPipe {
105 return Ok(());
106 }
107 }
108 }
109 Connection::ConnMut(_) => {}
110 Connection::Tx(_) => {}
111 }
112
113 Ok(())
114 }
115}
116
117impl futures_core::stream::Stream for BinlogStream {
118 type Item = Result<Event>;
119
120 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
121 {
122 let Self {
123 ref mut tpe,
124 ref mut esr,
125 ..
126 } = *self;
127
128 if let Some(tpe) = tpe.as_mut() {
129 match esr.read_decompressed(tpe) {
130 Ok(Some(event)) => return Poll::Ready(Some(Ok(event))),
131 Ok(None) => self.tpe = None,
132 Err(err) => return Poll::Ready(Some(Err(err.into()))),
133 }
134 }
135 }
136
137 let packet = match ready!(Pin::new(&mut self.read_packet).poll(cx)) {
138 Ok(packet) => packet,
139 Err(err) => return Poll::Ready(Some(Err(err.into()))),
140 };
141
142 let first_byte = packet.first().copied();
143
144 if first_byte == Some(255) {
145 if let Ok(ErrPacket::Error(err)) =
146 ParseBuf(&packet).parse(self.read_packet.conn_ref().capabilities())
147 {
148 return Poll::Ready(Some(Err(From::from(err))));
149 }
150 }
151
152 if first_byte == Some(254)
153 && packet.len() < 8
154 && ParseBuf(&packet)
155 .parse::<OkPacketDeserializer<NetworkStreamTerminator>>(
156 self.read_packet.conn_ref().capabilities(),
157 )
158 .is_ok()
159 {
160 return Poll::Ready(None);
161 }
162
163 if first_byte == Some(0) {
164 let event_data = &packet[1..];
165 match self.esr.read(event_data) {
166 Ok(Some(event)) => {
167 if event.header().event_type_raw() == EventType::TRANSACTION_PAYLOAD_EVENT as u8
168 {
169 #[allow(clippy::single_match)]
170 match event.read_event::<TransactionPayloadEvent<'_>>() {
171 Ok(e) => self.tpe = Some(Cursor::new(e.danger_decompress())),
172 Err(_) => (),
173 }
174 }
175 Poll::Ready(Some(Ok(event)))
176 }
177 Ok(None) => Poll::Ready(None),
178 Err(err) => Poll::Ready(Some(Err(err.into()))),
179 }
180 } else {
181 Poll::Ready(Some(Err(DriverError::UnexpectedPacket {
182 payload: packet.to_vec(),
183 }
184 .into())))
185 }
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use std::time::Duration;
192
193 use futures_util::StreamExt;
194 use mysql_common::binlog::events::EventData;
195 use tokio::time::timeout;
196
197 use crate::prelude::*;
198 use crate::{test_misc::get_opts, *};
199
200 async fn gen_dummy_data(conn: &mut Conn) -> super::Result<()> {
201 "CREATE TABLE IF NOT EXISTS customers (customer_id int not null)"
202 .ignore(&mut *conn)
203 .await?;
204
205 let mut tx = conn.start_transaction(Default::default()).await?;
206 for i in 0_u8..100 {
207 "INSERT INTO customers(customer_id) VALUES (?)"
208 .with((i,))
209 .ignore(&mut tx)
210 .await?;
211 }
212 tx.commit().await?;
213
214 "DROP TABLE customers".ignore(conn).await?;
215
216 Ok(())
217 }
218
219 async fn create_binlog_stream_conn(pool: Option<&Pool>) -> super::Result<(Conn, Vec<u8>, u64)> {
220 let mut conn = match pool {
221 None => Conn::new(get_opts()).await.unwrap(),
222 Some(pool) => pool.get_conn().await.unwrap(),
223 };
224
225 if conn.server_version() >= (8, 0, 31) && conn.server_version() < (9, 0, 0) {
226 let _ = "SET binlog_transaction_compression=ON"
227 .ignore(&mut conn)
228 .await;
229 }
230
231 if let Ok(Some(gtid_mode)) = "SELECT @@GLOBAL.GTID_MODE"
232 .first::<String, _>(&mut conn)
233 .await
234 {
235 if !gtid_mode.starts_with("ON") {
236 panic!(
237 "GTID_MODE is disabled \
238 (enable using --gtid_mode=ON --enforce_gtid_consistency=ON)"
239 );
240 }
241 }
242
243 let row: crate::Row = "SHOW BINARY LOGS".first(&mut conn).await.unwrap().unwrap();
244 let filename = row.get(0).unwrap();
245 let position = row.get(1).unwrap();
246
247 gen_dummy_data(&mut conn).await.unwrap();
248 Ok((conn, filename, position))
249 }
250
251 #[tokio::test]
252 async fn should_read_binlog() -> super::Result<()> {
253 read_binlog_streams_and_close_their_connections(None, (12, 13, 14))
254 .await
255 .unwrap();
256
257 let pool = Pool::new(get_opts());
258 read_binlog_streams_and_close_their_connections(Some(&pool), (15, 16, 17))
259 .await
260 .unwrap();
261
262 timeout(Duration::from_secs(10), pool.disconnect())
265 .await
266 .unwrap()
267 .unwrap();
268
269 Ok(())
270 }
271
272 async fn read_binlog_streams_and_close_their_connections(
273 pool: Option<&Pool>,
274 binlog_server_ids: (u32, u32, u32),
275 ) -> super::Result<()> {
276 let (conn, filename, pos) = create_binlog_stream_conn(pool).await.unwrap();
278 let is_mariadb = conn.inner.is_mariadb;
279
280 let mut binlog_stream = conn
281 .get_binlog_stream(
282 BinlogStreamRequest::new(binlog_server_ids.0)
283 .with_filename(&filename)
284 .with_pos(pos),
285 )
286 .await
287 .unwrap();
288
289 let mut events_num = 0;
290 while let Ok(Some(event)) = timeout(Duration::from_secs(10), binlog_stream.next()).await {
291 let event = event.unwrap();
292 events_num += 1;
293
294 event.header().event_type().unwrap();
296
297 if let EventData::RowsEvent(re) = event.read_data()?.unwrap() {
299 let tme = binlog_stream.get_tme(re.table_id());
300 for row in re.rows(tme.unwrap()) {
301 row.unwrap();
302 }
303 }
304 }
305 assert!(events_num > 0);
306 timeout(Duration::from_secs(10), binlog_stream.close())
307 .await
308 .unwrap()
309 .unwrap();
310
311 if !is_mariadb {
312 let (conn, filename, pos) = create_binlog_stream_conn(pool).await.unwrap();
314
315 let mut binlog_stream = conn
316 .get_binlog_stream(
317 BinlogStreamRequest::new(binlog_server_ids.1)
318 .with_gtid()
319 .with_filename(&filename)
320 .with_pos(pos),
321 )
322 .await
323 .unwrap();
324
325 events_num = 0;
326 while let Ok(Some(event)) = timeout(Duration::from_secs(10), binlog_stream.next()).await
327 {
328 let event = event.unwrap();
329 events_num += 1;
330
331 event.header().event_type().unwrap();
333
334 if let EventData::RowsEvent(re) = event.read_data()?.unwrap() {
336 let tme = binlog_stream.get_tme(re.table_id());
337 for row in re.rows(tme.unwrap()) {
338 row.unwrap();
339 }
340 }
341 }
342 assert!(events_num > 0);
343 timeout(Duration::from_secs(10), binlog_stream.close())
344 .await
345 .unwrap()
346 .unwrap();
347 }
348
349 let (conn, filename, pos) = create_binlog_stream_conn(pool).await.unwrap();
351
352 let mut binlog_stream = conn
353 .get_binlog_stream(
354 BinlogStreamRequest::new(binlog_server_ids.2)
355 .with_filename(&filename)
356 .with_pos(pos)
357 .with_non_blocking(),
358 )
359 .await
360 .unwrap();
361
362 events_num = 0;
363 while let Some(event) = binlog_stream.next().await {
364 let event = event.unwrap();
365 events_num += 1;
366 event.header().event_type().unwrap();
367 event.read_data().unwrap();
368 }
369 assert!(events_num > 0);
370 timeout(Duration::from_secs(10), binlog_stream.close())
371 .await
372 .unwrap()
373 .unwrap();
374
375 Ok(())
376 }
377}