Prevent buffer reallocations when generating Loki requests
Diff
Cargo.lock | 118 +++++++++++---------------------------------------------------------------------
Cargo.toml | 4 ++--
src/loki.rs | 50 +++++++++++++++++++++++++++++++++++++++++---------
src/main.rs | 119 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------
4 files changed, 114 insertions(+), 177 deletions(-)
@@ -81,16 +81,6 @@
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "chashmap"
version = "2.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff41a3c2c1e39921b9003de14bf0439c7b63a9039637c291e1a64925d8ddfa45"
dependencies = [
"owning_ref",
"parking_lot",
]
[[package]]
name = "chrono"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -230,12 +220,6 @@
dependencies = [
"libc",
]
[[package]]
name = "fuchsia-cprng"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"
[[package]]
name = "fuchsia-zircon"
@@ -556,9 +540,9 @@
[[package]]
name = "maxminddb"
version = "0.13.0"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9412a854bf1355d1ff92ef6ffe557dcc4a866e20cdffc7d3fc082174dba7436e"
checksum = "a9d4f312870f536f2f87d6afe80570c3df890bd9b9985dfadd3c2e9bba183d80"
dependencies = [
"log",
"memmap",
@@ -710,13 +694,12 @@
[[package]]
name = "notify"
version = "5.0.0-pre.2"
version = "5.0.0-pre.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b00c0b65188bffb5598c302e19b062feb94adef02c31f15622a163c95d673c3"
checksum = "77d03607cf88b4b160ba0e9ed425fff3cee3b55ac813f0c685b3a3772da37d0e"
dependencies = [
"anymap",
"bitflags",
"chashmap",
"crossbeam-channel",
"filetime",
"fsevent",
@@ -763,37 +746,6 @@
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06de47b848347d8c4c94219ad8ecd35eb90231704b067e67e6ae2e36ee023510"
[[package]]
name = "owning_ref"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdf84f41639e037b484f93433aa3897863b561ed65c6e59c7073d7c561710f37"
dependencies = [
"stable_deref_trait",
]
[[package]]
name = "parking_lot"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "149d8f5b97f3c1133e3cfcd8886449959e856b557ff281e292b733d7c69e005e"
dependencies = [
"owning_ref",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4db1a8ccf734a7bce794cc19b3df06ed87ab2f3907036b693c68f56b4d4537fa"
dependencies = [
"libc",
"rand 0.4.6",
"smallvec 0.6.13",
"winapi 0.3.8",
]
[[package]]
name = "percent-encoding"
@@ -948,19 +900,6 @@
checksum = "54a21852a652ad6f610c9510194f398ff6f8692e334fd1145fed931f7fbe44ea"
dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293"
dependencies = [
"fuchsia-cprng",
"libc",
"rand_core 0.3.1",
"rdrand",
"winapi 0.3.8",
]
[[package]]
@@ -972,7 +911,7 @@
"getrandom",
"libc",
"rand_chacha",
"rand_core 0.5.1",
"rand_core",
"rand_hc",
]
@@ -983,23 +922,8 @@
checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
dependencies = [
"ppv-lite86",
"rand_core 0.5.1",
]
[[package]]
name = "rand_core"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b"
dependencies = [
"rand_core 0.4.2",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc"
[[package]]
name = "rand_core"
@@ -1015,17 +939,8 @@
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
dependencies = [
"rand_core 0.5.1",
]
[[package]]
name = "rdrand"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2"
dependencies = [
"rand_core 0.3.1",
"rand_core",
]
[[package]]
@@ -1163,15 +1078,6 @@
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
[[package]]
name = "smallvec"
version = "0.6.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7b0758c52e15a8b5e3691eae6cc559f08eee9406e548a4477ba4e67770a82b6"
dependencies = [
"maybe-uninit",
]
[[package]]
name = "smallvec"
@@ -1196,12 +1102,6 @@
"redox_syscall",
"winapi 0.3.8",
]
[[package]]
name = "stable_deref_trait"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dba1a27d3efae4351c8051072d619e3ade2820635c3958d826bfea39d59b54c8"
[[package]]
name = "static_assertions"
@@ -1245,7 +1145,7 @@
dependencies = [
"cfg-if",
"libc",
"rand 0.7.3",
"rand",
"redox_syscall",
"remove_dir_all",
"winapi 0.3.8",
@@ -1382,7 +1282,7 @@
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5479532badd04e128284890390c1e876ef7a993d0570b3597ae43dfa1d59afa4"
dependencies = [
"smallvec 1.4.0",
"smallvec",
]
[[package]]
@@ -7,9 +7,9 @@
[dependencies]
notify = "5.0.0-pre.2"
notify = "5.0.0-pre.3"
crossbeam-channel = "0.4"
maxminddb = { version = "0.13", features = ["mmap"] }
maxminddb = { version = "0.14", features = ["mmap"] }
clap = "3.0.0-beta.1"
env_logger = "0.7"
@@ -6,32 +6,52 @@
use anyhow::Result;
use bytes::{Bytes, BytesMut};
use prost::Message;
use snap::raw::{max_compress_len, Encoder as SnappyEncoder};
pub fn create_push_request(entries: Vec<(i64, String)>) -> Result<Bytes> {
let mut entries_transformed = Vec::new();
for (timestamp, line) in entries {
entries_transformed.push(proto::EntryAdapter {
pub fn create_push_request(
snappy: &mut SnappyEncoder,
encode_buf: &mut BytesMut,
compress_buf: &mut BytesMut,
entries: &mut Vec<(i64, String)>,
) -> Result<Bytes> {
let entries = entries
.drain(..)
.map(|(timestamp, line)| proto::EntryAdapter {
timestamp: Some(prost_types::Timestamp {
seconds: timestamp,
nanos: 0,
}),
line,
});
}
})
.collect();
let req = proto::PushRequest {
streams: vec![proto::StreamAdapter {
labels: "{namespace=\"iptables\"}".to_string(),
entries: entries_transformed,
entries,
}],
};
let encoded_len = req.encoded_len();
if encode_buf.capacity() < encoded_len {
encode_buf.reserve(encoded_len);
}
req.encode(encode_buf)?;
let encoded_bytes = encode_buf.split().freeze();
let max_compress_len = max_compress_len(encoded_len);
if compress_buf.len() < max_compress_len {
compress_buf.resize(max_compress_len, 0);
}
let len = snappy.compress(&encoded_bytes, &mut compress_buf[..])?;
let compressed_bytes = compress_buf.split_to(len).freeze();
let mut s = BytesMut::new();
req.encode(&mut s)?;
let s = s.freeze();
Ok(Bytes::from(
snap::raw::Encoder::new().compress_vec(s.as_ref())?,
))
Ok(compressed_bytes)
}
@@ -5,6 +5,7 @@
mod parser;
use crate::parser::Log;
use bytes::BytesMut;
use chrono::DateTime;
use chrono::Utc;
use clap::Clap;
@@ -23,6 +24,7 @@
event::ModifyKind, immediate_watcher, Event, EventKind, RecommendedWatcher, RecursiveMode,
Watcher,
};
use snap::raw::Encoder as SnappyEncoder;
use anyhow::Result;
@@ -86,24 +88,22 @@
proto: &'a str,
asn: Option<u32>,
asn_org: Option<&'a String>,
city: Option<&'a String>,
country_code: Option<&'a String>,
country: Option<&'a String>,
asn_org: Option<&'a str>,
city: Option<&'a str>,
country_code: Option<&'a str>,
country: Option<&'a str>,
lat: Option<f64>,
lng: Option<f64>,
}
impl<'a> FirewallEntry<'a> {
fn from(
log: &'a Log<'a>,
city: &'a Option<geoip2::City>,
country: &'a Option<geoip2::Country>,
asn: &'a Option<geoip2::Asn>,
city: Option<&geoip2::City<'a>>,
country: Option<&geoip2::Country<'a>>,
asn: Option<&geoip2::Asn<'a>>,
) -> Result<FirewallEntry<'a>> {
let country = country
.as_ref()
.and_then(|country| country.country.as_ref());
let location = city.as_ref().and_then(|city| city.location.as_ref());
let country = country.and_then(|country| country.country.as_ref());
let location = city.and_then(|city| city.location.as_ref());
Ok(FirewallEntry {
time: &log.time,
@@ -118,19 +118,18 @@
dst_port: log.values.get("DPT").unwrap_or(&"").parse()?,
proto: log.values.get("PROTO").unwrap_or(&""),
asn: asn.as_ref().and_then(|asn| asn.autonomous_system_number),
asn_org: asn
.as_ref()
.and_then(|asn| asn.autonomous_system_organization.as_ref()),
asn: asn.and_then(|asn| asn.autonomous_system_number),
asn_org: asn.and_then(|asn| asn.autonomous_system_organization),
city: city
.as_ref()
.and_then(|city| city.city.as_ref())
.and_then(|city| city.names.as_ref())
.and_then(|names| names.get("en")),
country_code: country.and_then(|country| country.iso_code.as_ref()),
.and_then(|names| names.get("en").or_else(|| names.values().next()))
.copied(),
country_code: country.and_then(|country| country.iso_code),
country: country
.and_then(|country| country.names.as_ref())
.and_then(|names| names.get("en")),
.and_then(|names| names.get("en").or_else(|| names.values().next()))
.copied(),
lat: location.and_then(|loc| loc.latitude),
lng: location.and_then(|loc| loc.longitude),
})
@@ -154,13 +153,13 @@
self.dst,
self.dst_port,
self.proto,
self.asn.as_ref().map(ToString::to_string).unwrap_or_default(),
self.asn_org.as_ref().map(ToString::to_string).unwrap_or_default(),
self.city.as_ref().map(ToString::to_string).unwrap_or_default(),
self.country_code.as_ref().map(ToString::to_string).unwrap_or_default(),
self.country.as_ref().map(ToString::to_string).unwrap_or_default(),
self.lat.as_ref().map(ToString::to_string).unwrap_or_default(),
self.lng.as_ref().map(ToString::to_string).unwrap_or_default(),
self.asn.unwrap_or_default(),
self.asn_org.unwrap_or_default(),
self.city.unwrap_or_default(),
self.country_code.unwrap_or_default(),
self.country.unwrap_or_default(),
self.lat.unwrap_or_default(),
self.lng.unwrap_or_default(),
)
}
}
@@ -209,9 +208,15 @@
let city_db = open_geoip_reader(config.geoip.city_db.as_ref());
let country_db = open_geoip_reader(config.geoip.country_db.as_ref());
let mut snappy = SnappyEncoder::new();
let mut encode_buf = BytesMut::new();
let mut compress_buf = BytesMut::new();
let mut reader = open_log_file_or_exit();
let mut buf = String::new();
let client = reqwest::Client::new();
let mut entries: Vec<_> = Vec::with_capacity(8);
loop {
buf.clear();
@@ -260,33 +265,45 @@
country_db.as_ref().and_then(|db| db.lookup(ip).ok());
let timestamp = log_line.time.timestamp();
let entry = match FirewallEntry::from(&log_line, &city, &country, &asn) {
Ok(v) => v.to_string(),
Err(e) => {
error!("Failed to build firewall entry for log: {:?}", e);
continue;
}
};
let req = match crate::loki::create_push_request(vec![(timestamp, entry)]) {
Ok(v) => v,
Err(e) => {
error!("Error creating a Loki push request: {:?}", e);
continue;
}
};
match client.post(&config.loki.push_url).body(req).send().await {
Ok(resp) => {
if !resp.status().is_success() {
error!(
"Error pushing log to Loki ({}): {:?}",
resp.status(),
resp.text().await
);
let entry =
match FirewallEntry::from(&log_line, city.as_ref(), country.as_ref(), asn.as_ref())
{
Ok(v) => v.to_string(),
Err(e) => {
error!("Failed to build firewall entry for log: {:?}", e);
continue;
}
};
entries.push((timestamp, entry));
if entries.len() == entries.capacity() {
let req = match crate::loki::create_push_request(
&mut snappy,
&mut encode_buf,
&mut compress_buf,
&mut entries,
) {
Ok(v) => v,
Err(e) => {
error!("Error creating a Loki push request: {:?}", e);
continue;
}
};
match client.post(&config.loki.push_url).body(req).send().await {
Ok(resp) => {
if !resp.status().is_success() {
error!(
"Error pushing log to Loki ({}): {:?}",
resp.status(),
resp.text().await
);
}
}
Err(e) => error!("Error pushing log to Loki: {:?}", e),
}
Err(e) => error!("Error pushing log to Loki: {:?}", e),
}
}
}