mysql_async/conn/binlog_stream/
mod.rs

1// Copyright (c) 2020 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use futures_core::ready;
10use mysql_common::{
11    binlog::{
12        consts::{BinlogVersion::Version4, EventType},
13        events::{Event, TableMapEvent, TransactionPayloadEvent},
14        EventStreamReader,
15    },
16    io::ParseBuf,
17    packets::{ComRegisterSlave, ErrPacket, NetworkStreamTerminator, OkPacketDeserializer},
18};
19
20use std::{
21    future::Future,
22    io::{Cursor, ErrorKind},
23    pin::Pin,
24    task::{Context, Poll},
25};
26
27use crate::{connection_like::Connection, queryable::Queryable};
28use crate::{error::DriverError, io::ReadPacket, Conn, Error, IoError, Result};
29
30use self::request::BinlogStreamRequest;
31
32pub mod request;
33
34impl super::Conn {
35    /// Turns this connection into a binlog stream.
36    ///
37    /// You can use SHOW BINARY LOGS to get the current logfile and position from the master.
38    /// If the request’s filename is empty, the server will send the binlog-stream of the first known binlog.
39    pub async fn get_binlog_stream(
40        mut self,
41        request: BinlogStreamRequest<'_>,
42    ) -> Result<BinlogStream> {
43        self.request_binlog(request).await?;
44
45        Ok(BinlogStream::new(self))
46    }
47
48    async fn register_as_slave(
49        &mut self,
50        com_register_slave: ComRegisterSlave<'_>,
51    ) -> crate::Result<()> {
52        self.query_drop("SET @master_binlog_checksum='ALL'").await?;
53        self.write_command(&com_register_slave).await?;
54
55        // Server will respond with OK.
56        self.read_packet().await?;
57
58        Ok(())
59    }
60
61    async fn request_binlog(&mut self, request: BinlogStreamRequest<'_>) -> crate::Result<()> {
62        self.register_as_slave(request.register_slave).await?;
63        self.write_command(&request.binlog_request.as_cmd()).await?;
64        Ok(())
65    }
66}
67
68/// Binlog event stream.
69///
70/// Stream initialization is lazy, i.e. binlog won't be requested until this stream is polled.
71pub struct BinlogStream {
72    read_packet: ReadPacket<'static, 'static>,
73    esr: EventStreamReader,
74    // TODO: Use 'static reader here (requires impl on the mysql_common side).
75    /// Uncompressed Transaction_payload_event we are iterating over (if any).
76    tpe: Option<Cursor<Vec<u8>>>,
77}
78
79impl BinlogStream {
80    /// `conn` is a `Conn` with `request_binlog` executed on it.
81    pub(super) fn new(conn: Conn) -> Self {
82        BinlogStream {
83            read_packet: ReadPacket::new(conn),
84            esr: EventStreamReader::new(Version4),
85            tpe: None,
86        }
87    }
88
89    /// Returns a table map event for the given table id.
90    pub fn get_tme(&self, table_id: u64) -> Option<&TableMapEvent<'static>> {
91        self.esr.get_tme(table_id)
92    }
93
94    /// Closes the stream's `Conn`. Additionally, the connection is dropped, so its associated
95    /// pool (if any) will regain a connection slot.
96    pub async fn close(self) -> Result<()> {
97        match self.read_packet.0 {
98            // `close_conn` requires ownership of `Conn`. That's okay, because
99            // `BinLogStream`'s connection is always owned.
100            Connection::Conn(conn) => {
101                if let Err(Error::Io(IoError::Io(ref error))) = conn.close_conn().await {
102                    // If the binlog was requested with the flag BINLOG_DUMP_NON_BLOCK,
103                    // the connection's file handler will already have been closed (EOF).
104                    if error.kind() == ErrorKind::BrokenPipe {
105                        return Ok(());
106                    }
107                }
108            }
109            Connection::ConnMut(_) => {}
110            Connection::Tx(_) => {}
111        }
112
113        Ok(())
114    }
115}
116
117impl futures_core::stream::Stream for BinlogStream {
118    type Item = Result<Event>;
119
120    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
121        {
122            let Self {
123                ref mut tpe,
124                ref mut esr,
125                ..
126            } = *self;
127
128            if let Some(tpe) = tpe.as_mut() {
129                match esr.read_decompressed(tpe) {
130                    Ok(Some(event)) => return Poll::Ready(Some(Ok(event))),
131                    Ok(None) => self.tpe = None,
132                    Err(err) => return Poll::Ready(Some(Err(err.into()))),
133                }
134            }
135        }
136
137        let packet = match ready!(Pin::new(&mut self.read_packet).poll(cx)) {
138            Ok(packet) => packet,
139            Err(err) => return Poll::Ready(Some(Err(err.into()))),
140        };
141
142        let first_byte = packet.first().copied();
143
144        if first_byte == Some(255) {
145            if let Ok(ErrPacket::Error(err)) =
146                ParseBuf(&packet).parse(self.read_packet.conn_ref().capabilities())
147            {
148                return Poll::Ready(Some(Err(From::from(err))));
149            }
150        }
151
152        if first_byte == Some(254)
153            && packet.len() < 8
154            && ParseBuf(&packet)
155                .parse::<OkPacketDeserializer<NetworkStreamTerminator>>(
156                    self.read_packet.conn_ref().capabilities(),
157                )
158                .is_ok()
159        {
160            return Poll::Ready(None);
161        }
162
163        if first_byte == Some(0) {
164            let event_data = &packet[1..];
165            match self.esr.read(event_data) {
166                Ok(Some(event)) => {
167                    if event.header().event_type_raw() == EventType::TRANSACTION_PAYLOAD_EVENT as u8
168                    {
169                        #[allow(clippy::single_match)]
170                        match event.read_event::<TransactionPayloadEvent<'_>>() {
171                            Ok(e) => self.tpe = Some(Cursor::new(e.danger_decompress())),
172                            Err(_) => (/* TODO: Log the error */),
173                        }
174                    }
175                    Poll::Ready(Some(Ok(event)))
176                }
177                Ok(None) => Poll::Ready(None),
178                Err(err) => Poll::Ready(Some(Err(err.into()))),
179            }
180        } else {
181            Poll::Ready(Some(Err(DriverError::UnexpectedPacket {
182                payload: packet.to_vec(),
183            }
184            .into())))
185        }
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use std::time::Duration;
192
193    use futures_util::StreamExt;
194    use mysql_common::binlog::events::EventData;
195    use tokio::time::timeout;
196
197    use crate::prelude::*;
198    use crate::{test_misc::get_opts, *};
199
200    async fn gen_dummy_data(conn: &mut Conn) -> super::Result<()> {
201        "CREATE TABLE IF NOT EXISTS customers (customer_id int not null)"
202            .ignore(&mut *conn)
203            .await?;
204
205        let mut tx = conn.start_transaction(Default::default()).await?;
206        for i in 0_u8..100 {
207            "INSERT INTO customers(customer_id) VALUES (?)"
208                .with((i,))
209                .ignore(&mut tx)
210                .await?;
211        }
212        tx.commit().await?;
213
214        "DROP TABLE customers".ignore(conn).await?;
215
216        Ok(())
217    }
218
219    async fn create_binlog_stream_conn(pool: Option<&Pool>) -> super::Result<(Conn, Vec<u8>, u64)> {
220        let mut conn = match pool {
221            None => Conn::new(get_opts()).await.unwrap(),
222            Some(pool) => pool.get_conn().await.unwrap(),
223        };
224
225        if conn.server_version() >= (8, 0, 31) && conn.server_version() < (9, 0, 0) {
226            let _ = "SET binlog_transaction_compression=ON"
227                .ignore(&mut conn)
228                .await;
229        }
230
231        if let Ok(Some(gtid_mode)) = "SELECT @@GLOBAL.GTID_MODE"
232            .first::<String, _>(&mut conn)
233            .await
234        {
235            if !gtid_mode.starts_with("ON") {
236                panic!(
237                    "GTID_MODE is disabled \
238                        (enable using --gtid_mode=ON --enforce_gtid_consistency=ON)"
239                );
240            }
241        }
242
243        let row: crate::Row = "SHOW BINARY LOGS".first(&mut conn).await.unwrap().unwrap();
244        let filename = row.get(0).unwrap();
245        let position = row.get(1).unwrap();
246
247        gen_dummy_data(&mut conn).await.unwrap();
248        Ok((conn, filename, position))
249    }
250
251    #[tokio::test]
252    async fn should_read_binlog() -> super::Result<()> {
253        read_binlog_streams_and_close_their_connections(None, (12, 13, 14))
254            .await
255            .unwrap();
256
257        let pool = Pool::new(get_opts());
258        read_binlog_streams_and_close_their_connections(Some(&pool), (15, 16, 17))
259            .await
260            .unwrap();
261
262        // Disconnecting the pool verifies that closing the binlog connections
263        // left the pool in a sane state.
264        timeout(Duration::from_secs(10), pool.disconnect())
265            .await
266            .unwrap()
267            .unwrap();
268
269        Ok(())
270    }
271
272    async fn read_binlog_streams_and_close_their_connections(
273        pool: Option<&Pool>,
274        binlog_server_ids: (u32, u32, u32),
275    ) -> super::Result<()> {
276        // iterate using COM_BINLOG_DUMP
277        let (conn, filename, pos) = create_binlog_stream_conn(pool).await.unwrap();
278        let is_mariadb = conn.inner.is_mariadb;
279
280        let mut binlog_stream = conn
281            .get_binlog_stream(
282                BinlogStreamRequest::new(binlog_server_ids.0)
283                    .with_filename(&filename)
284                    .with_pos(pos),
285            )
286            .await
287            .unwrap();
288
289        let mut events_num = 0;
290        while let Ok(Some(event)) = timeout(Duration::from_secs(10), binlog_stream.next()).await {
291            let event = event.unwrap();
292            events_num += 1;
293
294            // assert that event type is known
295            event.header().event_type().unwrap();
296
297            // iterate over rows of an event
298            if let EventData::RowsEvent(re) = event.read_data()?.unwrap() {
299                let tme = binlog_stream.get_tme(re.table_id());
300                for row in re.rows(tme.unwrap()) {
301                    row.unwrap();
302                }
303            }
304        }
305        assert!(events_num > 0);
306        timeout(Duration::from_secs(10), binlog_stream.close())
307            .await
308            .unwrap()
309            .unwrap();
310
311        if !is_mariadb {
312            // iterate using COM_BINLOG_DUMP_GTID
313            let (conn, filename, pos) = create_binlog_stream_conn(pool).await.unwrap();
314
315            let mut binlog_stream = conn
316                .get_binlog_stream(
317                    BinlogStreamRequest::new(binlog_server_ids.1)
318                        .with_gtid()
319                        .with_filename(&filename)
320                        .with_pos(pos),
321                )
322                .await
323                .unwrap();
324
325            events_num = 0;
326            while let Ok(Some(event)) = timeout(Duration::from_secs(10), binlog_stream.next()).await
327            {
328                let event = event.unwrap();
329                events_num += 1;
330
331                // assert that event type is known
332                event.header().event_type().unwrap();
333
334                // iterate over rows of an event
335                if let EventData::RowsEvent(re) = event.read_data()?.unwrap() {
336                    let tme = binlog_stream.get_tme(re.table_id());
337                    for row in re.rows(tme.unwrap()) {
338                        row.unwrap();
339                    }
340                }
341            }
342            assert!(events_num > 0);
343            timeout(Duration::from_secs(10), binlog_stream.close())
344                .await
345                .unwrap()
346                .unwrap();
347        }
348
349        // iterate using COM_BINLOG_DUMP with BINLOG_DUMP_NON_BLOCK flag
350        let (conn, filename, pos) = create_binlog_stream_conn(pool).await.unwrap();
351
352        let mut binlog_stream = conn
353            .get_binlog_stream(
354                BinlogStreamRequest::new(binlog_server_ids.2)
355                    .with_filename(&filename)
356                    .with_pos(pos)
357                    .with_non_blocking(),
358            )
359            .await
360            .unwrap();
361
362        events_num = 0;
363        while let Some(event) = binlog_stream.next().await {
364            let event = event.unwrap();
365            events_num += 1;
366            event.header().event_type().unwrap();
367            event.read_data().unwrap();
368        }
369        assert!(events_num > 0);
370        timeout(Duration::from_secs(10), binlog_stream.close())
371            .await
372            .unwrap()
373            .unwrap();
374
375        Ok(())
376    }
377}