From 031b90cab331c81dbaeff67b40003595697b8492 Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Fri, 7 Aug 2020 04:32:09 +0100 Subject: [PATCH] Prevent buffer reallocations when generating Loki requests --- Cargo.lock | 118 +++++++++------------------------------------------------------------------------------------------------------------- Cargo.toml | 4 ++-- src/loki.rs | 48 ++++++++++++++++++++++++++++++++++-------------- src/main.rs | 115 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------------------------- 4 files changed, 111 insertions(+), 174 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dd1b148..a6a99c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -81,16 +81,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" [[package]] -name = "chashmap" -version = "2.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff41a3c2c1e39921b9003de14bf0439c7b63a9039637c291e1a64925d8ddfa45" -dependencies = [ - "owning_ref", - "parking_lot", -] - -[[package]] name = "chrono" version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -232,12 +222,6 @@ dependencies = [ ] [[package]] -name = "fuchsia-cprng" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" - -[[package]] name = "fuchsia-zircon" version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -556,9 +540,9 @@ checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" [[package]] name = "maxminddb" -version = "0.13.0" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9412a854bf1355d1ff92ef6ffe557dcc4a866e20cdffc7d3fc082174dba7436e" +checksum = "a9d4f312870f536f2f87d6afe80570c3df890bd9b9985dfadd3c2e9bba183d80" dependencies = [ "log", "memmap", @@ -710,13 +694,12 @@ dependencies = [ [[package]] name = "notify" -version = "5.0.0-pre.2" +version = "5.0.0-pre.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b00c0b65188bffb5598c302e19b062feb94adef02c31f15622a163c95d673c3" +checksum = "77d03607cf88b4b160ba0e9ed425fff3cee3b55ac813f0c685b3a3772da37d0e" dependencies = [ "anymap", "bitflags", - "chashmap", "crossbeam-channel", "filetime", "fsevent", @@ -765,37 +748,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06de47b848347d8c4c94219ad8ecd35eb90231704b067e67e6ae2e36ee023510" [[package]] -name = "owning_ref" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdf84f41639e037b484f93433aa3897863b561ed65c6e59c7073d7c561710f37" -dependencies = [ - "stable_deref_trait", -] - -[[package]] -name = "parking_lot" -version = "0.4.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "149d8f5b97f3c1133e3cfcd8886449959e856b557ff281e292b733d7c69e005e" -dependencies = [ - "owning_ref", - "parking_lot_core", -] - -[[package]] -name = "parking_lot_core" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4db1a8ccf734a7bce794cc19b3df06ed87ab2f3907036b693c68f56b4d4537fa" -dependencies = [ - "libc", - "rand 0.4.6", - "smallvec 0.6.13", - "winapi 0.3.8", -] - -[[package]] name = "percent-encoding" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -952,19 +904,6 @@ dependencies = [ [[package]] name = "rand" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" -dependencies = [ - "fuchsia-cprng", - "libc", - "rand_core 0.3.1", - "rdrand", - "winapi 0.3.8", -] - -[[package]] -name = "rand" version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" @@ -972,7 +911,7 @@ dependencies = [ "getrandom", "libc", "rand_chacha", - "rand_core 0.5.1", + "rand_core", "rand_hc", ] @@ -983,26 +922,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" dependencies = [ "ppv-lite86", - "rand_core 0.5.1", + "rand_core", ] [[package]] name = "rand_core" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" -dependencies = [ - "rand_core 0.4.2", -] - -[[package]] -name = "rand_core" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" - -[[package]] -name = "rand_core" version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" @@ -1016,16 +940,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" dependencies = [ - "rand_core 0.5.1", -] - -[[package]] -name = "rdrand" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" -dependencies = [ - "rand_core 0.3.1", + "rand_core", ] [[package]] @@ -1166,15 +1081,6 @@ checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" [[package]] name = "smallvec" -version = "0.6.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7b0758c52e15a8b5e3691eae6cc559f08eee9406e548a4477ba4e67770a82b6" -dependencies = [ - "maybe-uninit", -] - -[[package]] -name = "smallvec" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7cb5678e1615754284ec264d9bb5b4c27d2018577fd90ac0ceb578591ed5ee4" @@ -1198,12 +1104,6 @@ dependencies = [ ] [[package]] -name = "stable_deref_trait" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dba1a27d3efae4351c8051072d619e3ade2820635c3958d826bfea39d59b54c8" - -[[package]] name = "static_assertions" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1245,7 +1145,7 @@ checksum = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" dependencies = [ "cfg-if", "libc", - "rand 0.7.3", + "rand", "redox_syscall", "remove_dir_all", "winapi 0.3.8", @@ -1382,7 +1282,7 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5479532badd04e128284890390c1e876ef7a993d0570b3597ae43dfa1d59afa4" dependencies = [ - "smallvec 1.4.0", + "smallvec", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 7bbe469..a92229a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,9 +7,9 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -notify = "5.0.0-pre.2" +notify = "5.0.0-pre.3" crossbeam-channel = "0.4" -maxminddb = { version = "0.13", features = ["mmap"] } +maxminddb = { version = "0.14", features = ["mmap"] } clap = "3.0.0-beta.1" env_logger = "0.7" diff --git a/src/loki.rs b/src/loki.rs index f0871cf..7a5cd66 100644 --- a/src/loki.rs +++ b/src/loki.rs @@ -6,32 +6,52 @@ mod proto { use anyhow::Result; use bytes::{Bytes, BytesMut}; use prost::Message; +use snap::raw::{max_compress_len, Encoder as SnappyEncoder}; -pub fn create_push_request(entries: Vec<(i64, String)>) -> Result { - let mut entries_transformed = Vec::new(); - for (timestamp, line) in entries { - entries_transformed.push(proto::EntryAdapter { +pub fn create_push_request( + snappy: &mut SnappyEncoder, + encode_buf: &mut BytesMut, + compress_buf: &mut BytesMut, + entries: &mut Vec<(i64, String)>, +) -> Result { + let entries = entries + .drain(..) + .map(|(timestamp, line)| proto::EntryAdapter { timestamp: Some(prost_types::Timestamp { seconds: timestamp, nanos: 0, }), line, - }); - } + }) + .collect(); let req = proto::PushRequest { streams: vec![proto::StreamAdapter { labels: "{namespace=\"iptables\"}".to_string(), - entries: entries_transformed, + entries, }], }; - let mut s = BytesMut::new(); - req.encode(&mut s)?; - let s = s.freeze(); + // encode the PushRequest + let encoded_len = req.encoded_len(); + if encode_buf.capacity() < encoded_len { + encode_buf.reserve(encoded_len); + } + + req.encode(encode_buf)?; + let encoded_bytes = encode_buf.split().freeze(); + + // compress the PushRequest + let max_compress_len = max_compress_len(encoded_len); + if compress_buf.len() < max_compress_len { + // we resize the buffer instead of reserving capacity because we need + // the array to be initialized so when we pass the backing `&mut [u8]` + // the whole capacity is ready to be written into + compress_buf.resize(max_compress_len, 0); + } + + let len = snappy.compress(&encoded_bytes, &mut compress_buf[..])?; + let compressed_bytes = compress_buf.split_to(len).freeze(); - // TODO: can we do this without the copy? - Ok(Bytes::from( - snap::raw::Encoder::new().compress_vec(s.as_ref())?, - )) + Ok(compressed_bytes) } diff --git a/src/main.rs b/src/main.rs index 2bd5c1c..1d6b50e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ mod loki; mod parser; use crate::parser::Log; +use bytes::BytesMut; use chrono::DateTime; use chrono::Utc; use clap::Clap; @@ -23,6 +24,7 @@ use notify::{ event::ModifyKind, immediate_watcher, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher, }; +use snap::raw::Encoder as SnappyEncoder; use anyhow::Result; @@ -86,24 +88,22 @@ struct FirewallEntry<'a> { proto: &'a str, asn: Option, - asn_org: Option<&'a String>, - city: Option<&'a String>, - country_code: Option<&'a String>, - country: Option<&'a String>, + asn_org: Option<&'a str>, + city: Option<&'a str>, + country_code: Option<&'a str>, + country: Option<&'a str>, lat: Option, lng: Option, } impl<'a> FirewallEntry<'a> { fn from( log: &'a Log<'a>, - city: &'a Option, - country: &'a Option, - asn: &'a Option, + city: Option<&geoip2::City<'a>>, + country: Option<&geoip2::Country<'a>>, + asn: Option<&geoip2::Asn<'a>>, ) -> Result> { - let country = country - .as_ref() - .and_then(|country| country.country.as_ref()); - let location = city.as_ref().and_then(|city| city.location.as_ref()); + let country = country.and_then(|country| country.country.as_ref()); + let location = city.and_then(|city| city.location.as_ref()); Ok(FirewallEntry { time: &log.time, @@ -118,19 +118,18 @@ impl<'a> FirewallEntry<'a> { dst_port: log.values.get("DPT").unwrap_or(&"").parse()?, proto: log.values.get("PROTO").unwrap_or(&""), - asn: asn.as_ref().and_then(|asn| asn.autonomous_system_number), - asn_org: asn - .as_ref() - .and_then(|asn| asn.autonomous_system_organization.as_ref()), + asn: asn.and_then(|asn| asn.autonomous_system_number), + asn_org: asn.and_then(|asn| asn.autonomous_system_organization), city: city - .as_ref() .and_then(|city| city.city.as_ref()) .and_then(|city| city.names.as_ref()) - .and_then(|names| names.get("en")), - country_code: country.and_then(|country| country.iso_code.as_ref()), + .and_then(|names| names.get("en").or_else(|| names.values().next())) + .copied(), + country_code: country.and_then(|country| country.iso_code), country: country .and_then(|country| country.names.as_ref()) - .and_then(|names| names.get("en")), + .and_then(|names| names.get("en").or_else(|| names.values().next())) + .copied(), lat: location.and_then(|loc| loc.latitude), lng: location.and_then(|loc| loc.longitude), }) @@ -154,13 +153,13 @@ impl Display for FirewallEntry<'_> { self.dst, self.dst_port, self.proto, - self.asn.as_ref().map(ToString::to_string).unwrap_or_default(), - self.asn_org.as_ref().map(ToString::to_string).unwrap_or_default(), - self.city.as_ref().map(ToString::to_string).unwrap_or_default(), - self.country_code.as_ref().map(ToString::to_string).unwrap_or_default(), - self.country.as_ref().map(ToString::to_string).unwrap_or_default(), - self.lat.as_ref().map(ToString::to_string).unwrap_or_default(), - self.lng.as_ref().map(ToString::to_string).unwrap_or_default(), + self.asn.unwrap_or_default(), + self.asn_org.unwrap_or_default(), + self.city.unwrap_or_default(), + self.country_code.unwrap_or_default(), + self.country.unwrap_or_default(), + self.lat.unwrap_or_default(), + self.lng.unwrap_or_default(), ) } } @@ -209,10 +208,16 @@ async fn main() { let city_db = open_geoip_reader(config.geoip.city_db.as_ref()); let country_db = open_geoip_reader(config.geoip.country_db.as_ref()); + let mut snappy = SnappyEncoder::new(); + let mut encode_buf = BytesMut::new(); + let mut compress_buf = BytesMut::new(); + let mut reader = open_log_file_or_exit(); let mut buf = String::new(); let client = reqwest::Client::new(); + let mut entries: Vec<_> = Vec::with_capacity(8); + loop { buf.clear(); let read_bytes = match reader.reader.read_line(&mut buf) { @@ -260,33 +265,45 @@ async fn main() { country_db.as_ref().and_then(|db| db.lookup(ip).ok()); let timestamp = log_line.time.timestamp(); - let entry = match FirewallEntry::from(&log_line, &city, &country, &asn) { - Ok(v) => v.to_string(), - Err(e) => { - error!("Failed to build firewall entry for log: {:?}", e); - continue; - } - }; + let entry = + match FirewallEntry::from(&log_line, city.as_ref(), country.as_ref(), asn.as_ref()) + { + Ok(v) => v.to_string(), + Err(e) => { + error!("Failed to build firewall entry for log: {:?}", e); + continue; + } + }; - let req = match crate::loki::create_push_request(vec![(timestamp, entry)]) { - Ok(v) => v, - Err(e) => { - error!("Error creating a Loki push request: {:?}", e); - continue; - } - }; + entries.push((timestamp, entry)); + + // once the vec reaches capacity, flush to loki + if entries.len() == entries.capacity() { + let req = match crate::loki::create_push_request( + &mut snappy, + &mut encode_buf, + &mut compress_buf, + &mut entries, + ) { + Ok(v) => v, + Err(e) => { + error!("Error creating a Loki push request: {:?}", e); + continue; + } + }; - match client.post(&config.loki.push_url).body(req).send().await { - Ok(resp) => { - if !resp.status().is_success() { - error!( - "Error pushing log to Loki ({}): {:?}", - resp.status(), - resp.text().await - ); + match client.post(&config.loki.push_url).body(req).send().await { + Ok(resp) => { + if !resp.status().is_success() { + error!( + "Error pushing log to Loki ({}): {:?}", + resp.status(), + resp.text().await + ); + } } + Err(e) => error!("Error pushing log to Loki: {:?}", e), } - Err(e) => error!("Error pushing log to Loki: {:?}", e), } } } -- libgit2 1.7.2