#[allow(clippy::default_trait_access)] mod proto { include!(concat!(env!("OUT_DIR"), "/logproto.rs")); } use anyhow::Result; use bytes::{Bytes, BytesMut}; use prost::Message; use snap::raw::{max_compress_len, Encoder as SnappyEncoder}; pub fn create_push_request( snappy: &mut SnappyEncoder, encode_buf: &mut BytesMut, compress_buf: &mut BytesMut, entries: &mut Vec<(i64, String)>, ) -> Result { let entries = entries .drain(..) .map(|(timestamp, line)| proto::EntryAdapter { timestamp: Some(prost_types::Timestamp { seconds: timestamp, nanos: 0, }), line, }) .collect(); let req = proto::PushRequest { streams: vec![proto::StreamAdapter { labels: "{namespace=\"iptables\"}".to_string(), entries, }], }; // encode the PushRequest let encoded_len = req.encoded_len(); if encode_buf.capacity() < encoded_len { encode_buf.reserve(encoded_len); } req.encode(encode_buf)?; let encoded_bytes = encode_buf.split().freeze(); // compress the PushRequest let max_compress_len = max_compress_len(encoded_len); if compress_buf.len() < max_compress_len { // we resize the buffer instead of reserving capacity because we need // the array to be initialized so when we pass the backing `&mut [u8]` // the whole capacity is ready to be written into compress_buf.resize(max_compress_len, 0); } let len = snappy.compress(&encoded_bytes, &mut compress_buf[..])?; let compressed_bytes = compress_buf.split_to(len).freeze(); Ok(compressed_bytes) }