🏡 index : ~doyle/fwloki.git

author Jordan Doyle <jordan@doyle.la> 2020-08-07 4:32:09.0 +01:00:00
committer Jordan Doyle <jordan@doyle.la> 2020-08-07 4:32:09.0 +01:00:00
commit
031b90cab331c81dbaeff67b40003595697b8492 [patch]
tree
070d39df300f934ffb17479d78fe456602adc85a
parent
df4f77e75059f2051c4724e2f94d871191e42ae7
download
master.tar.gz

Prevent buffer reallocations when generating Loki requests



Diff

 Cargo.lock  | 118 +++++++++++---------------------------------------------------------------------
 Cargo.toml  |   4 ++--
 src/loki.rs |  50 +++++++++++++++++++++++++++++++++++++++++---------
 src/main.rs | 119 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------
 4 files changed, 114 insertions(+), 177 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index dd1b148..a6a99c9 100644
--- a/Cargo.lock
+++ a/Cargo.lock
@@ -81,16 +81,6 @@
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"

[[package]]
name = "chashmap"
version = "2.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff41a3c2c1e39921b9003de14bf0439c7b63a9039637c291e1a64925d8ddfa45"
dependencies = [
 "owning_ref",
 "parking_lot",
]

[[package]]
name = "chrono"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -230,12 +220,6 @@
dependencies = [
 "libc",
]

[[package]]
name = "fuchsia-cprng"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"

[[package]]
name = "fuchsia-zircon"
@@ -556,9 +540,9 @@

[[package]]
name = "maxminddb"
version = "0.13.0"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9412a854bf1355d1ff92ef6ffe557dcc4a866e20cdffc7d3fc082174dba7436e"
checksum = "a9d4f312870f536f2f87d6afe80570c3df890bd9b9985dfadd3c2e9bba183d80"
dependencies = [
 "log",
 "memmap",
@@ -710,13 +694,12 @@

[[package]]
name = "notify"
version = "5.0.0-pre.2"
version = "5.0.0-pre.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b00c0b65188bffb5598c302e19b062feb94adef02c31f15622a163c95d673c3"
checksum = "77d03607cf88b4b160ba0e9ed425fff3cee3b55ac813f0c685b3a3772da37d0e"
dependencies = [
 "anymap",
 "bitflags",
 "chashmap",
 "crossbeam-channel",
 "filetime",
 "fsevent",
@@ -763,37 +746,6 @@
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06de47b848347d8c4c94219ad8ecd35eb90231704b067e67e6ae2e36ee023510"

[[package]]
name = "owning_ref"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdf84f41639e037b484f93433aa3897863b561ed65c6e59c7073d7c561710f37"
dependencies = [
 "stable_deref_trait",
]

[[package]]
name = "parking_lot"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "149d8f5b97f3c1133e3cfcd8886449959e856b557ff281e292b733d7c69e005e"
dependencies = [
 "owning_ref",
 "parking_lot_core",
]

[[package]]
name = "parking_lot_core"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4db1a8ccf734a7bce794cc19b3df06ed87ab2f3907036b693c68f56b4d4537fa"
dependencies = [
 "libc",
 "rand 0.4.6",
 "smallvec 0.6.13",
 "winapi 0.3.8",
]

[[package]]
name = "percent-encoding"
@@ -948,19 +900,6 @@
checksum = "54a21852a652ad6f610c9510194f398ff6f8692e334fd1145fed931f7fbe44ea"
dependencies = [
 "proc-macro2",
]

[[package]]
name = "rand"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293"
dependencies = [
 "fuchsia-cprng",
 "libc",
 "rand_core 0.3.1",
 "rdrand",
 "winapi 0.3.8",
]

[[package]]
@@ -972,7 +911,7 @@
 "getrandom",
 "libc",
 "rand_chacha",
 "rand_core 0.5.1",
 "rand_core",
 "rand_hc",
]

@@ -983,23 +922,8 @@
checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
dependencies = [
 "ppv-lite86",
 "rand_core 0.5.1",
]

[[package]]
name = "rand_core"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b"
dependencies = [
 "rand_core 0.4.2",
 "rand_core",
]

[[package]]
name = "rand_core"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc"

[[package]]
name = "rand_core"
@@ -1015,17 +939,8 @@
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
dependencies = [
 "rand_core 0.5.1",
]

[[package]]
name = "rdrand"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2"
dependencies = [
 "rand_core 0.3.1",
 "rand_core",
]

[[package]]
@@ -1163,15 +1078,6 @@
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"

[[package]]
name = "smallvec"
version = "0.6.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7b0758c52e15a8b5e3691eae6cc559f08eee9406e548a4477ba4e67770a82b6"
dependencies = [
 "maybe-uninit",
]

[[package]]
name = "smallvec"
@@ -1196,12 +1102,6 @@
 "redox_syscall",
 "winapi 0.3.8",
]

[[package]]
name = "stable_deref_trait"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dba1a27d3efae4351c8051072d619e3ade2820635c3958d826bfea39d59b54c8"

[[package]]
name = "static_assertions"
@@ -1245,7 +1145,7 @@
dependencies = [
 "cfg-if",
 "libc",
 "rand 0.7.3",
 "rand",
 "redox_syscall",
 "remove_dir_all",
 "winapi 0.3.8",
@@ -1382,7 +1282,7 @@
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5479532badd04e128284890390c1e876ef7a993d0570b3597ae43dfa1d59afa4"
dependencies = [
 "smallvec 1.4.0",
 "smallvec",
]

[[package]]
diff --git a/Cargo.toml b/Cargo.toml
index 7bbe469..a92229a 100644
--- a/Cargo.toml
+++ a/Cargo.toml
@@ -7,9 +7,9 @@
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
notify = "5.0.0-pre.2"
notify = "5.0.0-pre.3"
crossbeam-channel = "0.4"
maxminddb = { version = "0.13", features = ["mmap"] }
maxminddb = { version = "0.14", features = ["mmap"] }

clap = "3.0.0-beta.1"
env_logger = "0.7"
diff --git a/src/loki.rs b/src/loki.rs
index f0871cf..7a5cd66 100644
--- a/src/loki.rs
+++ a/src/loki.rs
@@ -6,32 +6,52 @@
use anyhow::Result;
use bytes::{Bytes, BytesMut};
use prost::Message;
use snap::raw::{max_compress_len, Encoder as SnappyEncoder};

pub fn create_push_request(entries: Vec<(i64, String)>) -> Result<Bytes> {
    let mut entries_transformed = Vec::new();
    for (timestamp, line) in entries {
        entries_transformed.push(proto::EntryAdapter {
pub fn create_push_request(
    snappy: &mut SnappyEncoder,
    encode_buf: &mut BytesMut,
    compress_buf: &mut BytesMut,
    entries: &mut Vec<(i64, String)>,
) -> Result<Bytes> {
    let entries = entries
        .drain(..)
        .map(|(timestamp, line)| proto::EntryAdapter {
            timestamp: Some(prost_types::Timestamp {
                seconds: timestamp,
                nanos: 0,
            }),
            line,
        });
    }
        })
        .collect();

    let req = proto::PushRequest {
        streams: vec![proto::StreamAdapter {
            labels: "{namespace=\"iptables\"}".to_string(),
            entries: entries_transformed,
            entries,
        }],
    };

    // encode the PushRequest
    let encoded_len = req.encoded_len();
    if encode_buf.capacity() < encoded_len {
        encode_buf.reserve(encoded_len);
    }

    req.encode(encode_buf)?;
    let encoded_bytes = encode_buf.split().freeze();

    // compress the PushRequest
    let max_compress_len = max_compress_len(encoded_len);
    if compress_buf.len() < max_compress_len {
        // we resize the buffer instead of reserving capacity because we need
        // the array to be initialized so when we pass the backing `&mut [u8]`
        // the whole capacity is ready to be written into
        compress_buf.resize(max_compress_len, 0);
    }

    let len = snappy.compress(&encoded_bytes, &mut compress_buf[..])?;
    let compressed_bytes = compress_buf.split_to(len).freeze();

    let mut s = BytesMut::new();
    req.encode(&mut s)?;
    let s = s.freeze();

    // TODO: can we do this without the copy?
    Ok(Bytes::from(
        snap::raw::Encoder::new().compress_vec(s.as_ref())?,
    ))
    Ok(compressed_bytes)
}
diff --git a/src/main.rs b/src/main.rs
index 2bd5c1c..1d6b50e 100644
--- a/src/main.rs
+++ a/src/main.rs
@@ -5,6 +5,7 @@
mod parser;

use crate::parser::Log;
use bytes::BytesMut;
use chrono::DateTime;
use chrono::Utc;
use clap::Clap;
@@ -23,6 +24,7 @@
    event::ModifyKind, immediate_watcher, Event, EventKind, RecommendedWatcher, RecursiveMode,
    Watcher,
};
use snap::raw::Encoder as SnappyEncoder;

use anyhow::Result;

@@ -86,24 +88,22 @@
    proto: &'a str,

    asn: Option<u32>,
    asn_org: Option<&'a String>,
    city: Option<&'a String>,
    country_code: Option<&'a String>,
    country: Option<&'a String>,
    asn_org: Option<&'a str>,
    city: Option<&'a str>,
    country_code: Option<&'a str>,
    country: Option<&'a str>,
    lat: Option<f64>,
    lng: Option<f64>,
}
impl<'a> FirewallEntry<'a> {
    fn from(
        log: &'a Log<'a>,
        city: &'a Option<geoip2::City>,
        country: &'a Option<geoip2::Country>,
        asn: &'a Option<geoip2::Asn>,
        city: Option<&geoip2::City<'a>>,
        country: Option<&geoip2::Country<'a>>,
        asn: Option<&geoip2::Asn<'a>>,
    ) -> Result<FirewallEntry<'a>> {
        let country = country
            .as_ref()
            .and_then(|country| country.country.as_ref());
        let location = city.as_ref().and_then(|city| city.location.as_ref());
        let country = country.and_then(|country| country.country.as_ref());
        let location = city.and_then(|city| city.location.as_ref());

        Ok(FirewallEntry {
            time: &log.time,
@@ -118,19 +118,18 @@
            dst_port: log.values.get("DPT").unwrap_or(&"").parse()?,
            proto: log.values.get("PROTO").unwrap_or(&""),

            asn: asn.as_ref().and_then(|asn| asn.autonomous_system_number),
            asn_org: asn
                .as_ref()
                .and_then(|asn| asn.autonomous_system_organization.as_ref()),
            asn: asn.and_then(|asn| asn.autonomous_system_number),
            asn_org: asn.and_then(|asn| asn.autonomous_system_organization),
            city: city
                .as_ref()
                .and_then(|city| city.city.as_ref())
                .and_then(|city| city.names.as_ref())
                .and_then(|names| names.get("en")),
            country_code: country.and_then(|country| country.iso_code.as_ref()),
                .and_then(|names| names.get("en").or_else(|| names.values().next()))
                .copied(),
            country_code: country.and_then(|country| country.iso_code),
            country: country
                .and_then(|country| country.names.as_ref())
                .and_then(|names| names.get("en")),
                .and_then(|names| names.get("en").or_else(|| names.values().next()))
                .copied(),
            lat: location.and_then(|loc| loc.latitude),
            lng: location.and_then(|loc| loc.longitude),
        })
@@ -154,13 +153,13 @@
            self.dst,
            self.dst_port,
            self.proto,
            self.asn.as_ref().map(ToString::to_string).unwrap_or_default(),
            self.asn_org.as_ref().map(ToString::to_string).unwrap_or_default(),
            self.city.as_ref().map(ToString::to_string).unwrap_or_default(),
            self.country_code.as_ref().map(ToString::to_string).unwrap_or_default(),
            self.country.as_ref().map(ToString::to_string).unwrap_or_default(),
            self.lat.as_ref().map(ToString::to_string).unwrap_or_default(),
            self.lng.as_ref().map(ToString::to_string).unwrap_or_default(),
            self.asn.unwrap_or_default(),
            self.asn_org.unwrap_or_default(),
            self.city.unwrap_or_default(),
            self.country_code.unwrap_or_default(),
            self.country.unwrap_or_default(),
            self.lat.unwrap_or_default(),
            self.lng.unwrap_or_default(),
        )
    }
}
@@ -209,9 +208,15 @@
    let city_db = open_geoip_reader(config.geoip.city_db.as_ref());
    let country_db = open_geoip_reader(config.geoip.country_db.as_ref());

    let mut snappy = SnappyEncoder::new();
    let mut encode_buf = BytesMut::new();
    let mut compress_buf = BytesMut::new();

    let mut reader = open_log_file_or_exit();
    let mut buf = String::new();
    let client = reqwest::Client::new();

    let mut entries: Vec<_> = Vec::with_capacity(8);

    loop {
        buf.clear();
@@ -260,33 +265,45 @@
                country_db.as_ref().and_then(|db| db.lookup(ip).ok());

            let timestamp = log_line.time.timestamp();
            let entry = match FirewallEntry::from(&log_line, &city, &country, &asn) {
                Ok(v) => v.to_string(),
                Err(e) => {
                    error!("Failed to build firewall entry for log: {:?}", e);
                    continue;
                }
            };

            let req = match crate::loki::create_push_request(vec![(timestamp, entry)]) {
                Ok(v) => v,
                Err(e) => {
                    error!("Error creating a Loki push request: {:?}", e);
                    continue;
                }
            };

            match client.post(&config.loki.push_url).body(req).send().await {
                Ok(resp) => {
                    if !resp.status().is_success() {
                        error!(
                            "Error pushing log to Loki ({}): {:?}",
                            resp.status(),
                            resp.text().await
                        );
            let entry =
                match FirewallEntry::from(&log_line, city.as_ref(), country.as_ref(), asn.as_ref())
                {
                    Ok(v) => v.to_string(),
                    Err(e) => {
                        error!("Failed to build firewall entry for log: {:?}", e);
                        continue;
                    }
                };

            entries.push((timestamp, entry));

            // once the vec reaches capacity, flush to loki
            if entries.len() == entries.capacity() {
                let req = match crate::loki::create_push_request(
                    &mut snappy,
                    &mut encode_buf,
                    &mut compress_buf,
                    &mut entries,
                ) {
                    Ok(v) => v,
                    Err(e) => {
                        error!("Error creating a Loki push request: {:?}", e);
                        continue;
                    }
                };

                match client.post(&config.loki.push_url).body(req).send().await {
                    Ok(resp) => {
                        if !resp.status().is_success() {
                            error!(
                                "Error pushing log to Loki ({}): {:?}",
                                resp.status(),
                                resp.text().await
                            );
                        }
                    }
                    Err(e) => error!("Error pushing log to Loki: {:?}", e),
                }
                Err(e) => error!("Error pushing log to Loki: {:?}", e),
            }
        }
    }