tiberius/tds/codec/
bulk_load.rs

1use asynchronous_codec::BytesMut;
2use futures_util::io::{AsyncRead, AsyncWrite};
3use tracing::{event, Level};
4
5use crate::{
6    client::Connection, sql_read_bytes::SqlReadBytes, BytesMutWithDataColumns, ExecuteResult,
7};
8
9use super::{
10    Encode, MetaDataColumn, PacketHeader, PacketStatus, TokenColMetaData, TokenDone, TokenRow,
11    HEADER_BYTES,
12};
13
14/// A handler for a bulk insert data flow.
15#[derive(Debug)]
16pub struct BulkLoadRequest<'a, S>
17where
18    S: AsyncRead + AsyncWrite + Unpin + Send,
19{
20    connection: &'a mut Connection<S>,
21    packet_id: u8,
22    buf: BytesMut,
23    columns: Vec<MetaDataColumn<'a>>,
24}
25
26impl<'a, S> BulkLoadRequest<'a, S>
27where
28    S: AsyncRead + AsyncWrite + Unpin + Send,
29{
30    pub(crate) fn new(
31        connection: &'a mut Connection<S>,
32        columns: Vec<MetaDataColumn<'a>>,
33    ) -> crate::Result<Self> {
34        let packet_id = connection.context_mut().next_packet_id();
35        let mut buf = BytesMut::new();
36
37        let cmd = TokenColMetaData {
38            columns: columns.clone(),
39        };
40
41        cmd.encode(&mut buf)?;
42
43        let this = Self {
44            connection,
45            packet_id,
46            buf,
47            columns,
48        };
49
50        Ok(this)
51    }
52
53    /// Adds a new row to the bulk insert, flushing only when having a full packet of data.
54    ///
55    /// # Warning
56    ///
57    /// After the last row, [`finalize`] must be called to flush the buffered
58    /// data and for the data to actually be available in the table.
59    ///
60    /// [`finalize`]: #method.finalize
61    pub async fn send(&mut self, row: TokenRow<'a>) -> crate::Result<()> {
62        let mut buf_with_columns = BytesMutWithDataColumns::new(&mut self.buf, &self.columns);
63
64        row.encode(&mut buf_with_columns)?;
65        self.write_packets().await?;
66
67        Ok(())
68    }
69
70    /// Ends the bulk load, flushing all pending data to the wire.
71    ///
72    /// This method must be called after sending all the data to flush all
73    /// pending data and to get the server actually to store the rows to the
74    /// table.
75    pub async fn finalize(mut self) -> crate::Result<ExecuteResult> {
76        TokenDone::default().encode(&mut self.buf)?;
77        self.write_packets().await?;
78
79        let mut header = PacketHeader::bulk_load(self.packet_id);
80        header.set_status(PacketStatus::EndOfMessage);
81
82        let data = self.buf.split();
83
84        event!(
85            Level::TRACE,
86            "Finalizing a bulk insert ({} bytes)",
87            data.len() + HEADER_BYTES,
88        );
89
90        self.connection.write_to_wire(header, data).await?;
91        self.connection.flush_sink().await?;
92
93        ExecuteResult::new(self.connection).await
94    }
95
96    async fn write_packets(&mut self) -> crate::Result<()> {
97        let packet_size = (self.connection.context().packet_size() as usize) - HEADER_BYTES;
98
99        while self.buf.len() > packet_size {
100            let header = PacketHeader::bulk_load(self.packet_id);
101            let data = self.buf.split_to(packet_size);
102
103            event!(
104                Level::TRACE,
105                "Bulk insert packet ({} bytes)",
106                data.len() + HEADER_BYTES,
107            );
108
109            self.connection.write_to_wire(header, data).await?;
110        }
111
112        Ok(())
113    }
114}