tiberius/tds/codec/
bulk_load.rs
1use asynchronous_codec::BytesMut;
2use futures_util::io::{AsyncRead, AsyncWrite};
3use tracing::{event, Level};
4
5use crate::{
6 client::Connection, sql_read_bytes::SqlReadBytes, BytesMutWithDataColumns, ExecuteResult,
7};
8
9use super::{
10 Encode, MetaDataColumn, PacketHeader, PacketStatus, TokenColMetaData, TokenDone, TokenRow,
11 HEADER_BYTES,
12};
13
14#[derive(Debug)]
16pub struct BulkLoadRequest<'a, S>
17where
18 S: AsyncRead + AsyncWrite + Unpin + Send,
19{
20 connection: &'a mut Connection<S>,
21 packet_id: u8,
22 buf: BytesMut,
23 columns: Vec<MetaDataColumn<'a>>,
24}
25
26impl<'a, S> BulkLoadRequest<'a, S>
27where
28 S: AsyncRead + AsyncWrite + Unpin + Send,
29{
30 pub(crate) fn new(
31 connection: &'a mut Connection<S>,
32 columns: Vec<MetaDataColumn<'a>>,
33 ) -> crate::Result<Self> {
34 let packet_id = connection.context_mut().next_packet_id();
35 let mut buf = BytesMut::new();
36
37 let cmd = TokenColMetaData {
38 columns: columns.clone(),
39 };
40
41 cmd.encode(&mut buf)?;
42
43 let this = Self {
44 connection,
45 packet_id,
46 buf,
47 columns,
48 };
49
50 Ok(this)
51 }
52
53 pub async fn send(&mut self, row: TokenRow<'a>) -> crate::Result<()> {
62 let mut buf_with_columns = BytesMutWithDataColumns::new(&mut self.buf, &self.columns);
63
64 row.encode(&mut buf_with_columns)?;
65 self.write_packets().await?;
66
67 Ok(())
68 }
69
70 pub async fn finalize(mut self) -> crate::Result<ExecuteResult> {
76 TokenDone::default().encode(&mut self.buf)?;
77 self.write_packets().await?;
78
79 let mut header = PacketHeader::bulk_load(self.packet_id);
80 header.set_status(PacketStatus::EndOfMessage);
81
82 let data = self.buf.split();
83
84 event!(
85 Level::TRACE,
86 "Finalizing a bulk insert ({} bytes)",
87 data.len() + HEADER_BYTES,
88 );
89
90 self.connection.write_to_wire(header, data).await?;
91 self.connection.flush_sink().await?;
92
93 ExecuteResult::new(self.connection).await
94 }
95
96 async fn write_packets(&mut self) -> crate::Result<()> {
97 let packet_size = (self.connection.context().packet_size() as usize) - HEADER_BYTES;
98
99 while self.buf.len() > packet_size {
100 let header = PacketHeader::bulk_load(self.packet_id);
101 let data = self.buf.split_to(packet_size);
102
103 event!(
104 Level::TRACE,
105 "Bulk insert packet ({} bytes)",
106 data.len() + HEADER_BYTES,
107 );
108
109 self.connection.write_to_wire(header, data).await?;
110 }
111
112 Ok(())
113 }
114}