#![deny(clippy::all, clippy::pedantic)] mod config; mod loki; mod parser; use crate::parser::Log; use bytes::BytesMut; use chrono::DateTime; use chrono::Utc; use clap::Clap; use std::fmt::Display; use std::fs::File; use std::io::{BufRead, BufReader}; use std::net::IpAddr; use std::path::Path; use std::time::Duration; use log::{debug, error}; use crossbeam_channel::Receiver; use maxminddb::geoip2; use notify::{ event::ModifyKind, immediate_watcher, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher, }; use snap::raw::Encoder as SnappyEncoder; use anyhow::Result; enum ModifyType { Data, Rotate, } struct LogReader { _watcher: RecommendedWatcher, reader: BufReader, recv: Receiver, } fn open_log_file(path: &Path) -> Result { while !path.exists() { std::thread::sleep(Duration::new(1, 0)); } // setup an inotify watcher and forward events on to the channel let (send, recv) = crossbeam_channel::bounded(0); let mut watcher = immediate_watcher(move |e| match e { Ok(Event { kind: EventKind::Modify(ModifyKind::Data(_)), .. }) => { // we don't really care if the receiever isn't listening on the channel let _ = send.try_send(ModifyType::Data); } Ok(Event { kind: EventKind::Modify(ModifyKind::Name(_)), .. }) => { let _ = send.try_send(ModifyType::Rotate); } Ok(Event { .. }) => {} Err(e) => error!("Error watching file: {:?}", e), })?; watcher.watch(path, RecursiveMode::NonRecursive)?; Ok(LogReader { _watcher: watcher, reader: BufReader::new(File::open(path)?), recv, }) } #[derive(Debug)] struct FirewallEntry<'a> { time: &'a DateTime, hostname: &'a str, rule: &'a str, interface: &'a str, mac: &'a str, src: IpAddr, src_port: u16, dst: IpAddr, dst_port: u16, proto: &'a str, asn: Option, asn_org: Option<&'a str>, city: Option<&'a str>, country_code: Option<&'a str>, country: Option<&'a str>, lat: Option, lng: Option, } impl<'a> FirewallEntry<'a> { fn from( log: &'a Log<'a>, city: Option<&geoip2::City<'a>>, country: Option<&geoip2::Country<'a>>, asn: Option<&geoip2::Asn<'a>>, ) -> Result> { let country = country.and_then(|country| country.country.as_ref()); let location = city.and_then(|city| city.location.as_ref()); Ok(FirewallEntry { time: &log.time, hostname: log.hostname, rule: log.rule, interface: log.values.get("IN").unwrap_or(&""), mac: log.values.get("MAC").unwrap_or(&""), src: log.values.get("SRC").unwrap_or(&"").parse()?, src_port: log.values.get("SPT").unwrap_or(&"").parse()?, dst: log.values.get("DST").unwrap_or(&"").parse()?, dst_port: log.values.get("DPT").unwrap_or(&"").parse()?, proto: log.values.get("PROTO").unwrap_or(&""), asn: asn.and_then(|asn| asn.autonomous_system_number), asn_org: asn.and_then(|asn| asn.autonomous_system_organization), city: city .and_then(|city| city.city.as_ref()) .and_then(|city| city.names.as_ref()) .and_then(|names| names.get("en").or_else(|| names.values().next())) .copied(), country_code: country.and_then(|country| country.iso_code), country: country .and_then(|country| country.names.as_ref()) .and_then(|names| names.get("en").or_else(|| names.values().next())) .copied(), lat: location.and_then(|loc| loc.latitude), lng: location.and_then(|loc| loc.longitude), }) } } impl Display for FirewallEntry<'_> { fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { write!( fmt, concat!( "hostname=\"{}\" rule=\"{}\" interface=\"{}\" mac=\"{}\" src=\"{}\" src_port=\"{}\" ", "dst=\"{}\" dst_port=\"{}\" proto=\"{}\" asn=\"{}\" asn_org=\"{}\" city=\"{}\" ", "country_code=\"{}\" country=\"{}\" lat=\"{}\" lng=\"{}\"", ), self.hostname, self.rule, self.interface, self.mac, self.src, self.src_port, self.dst, self.dst_port, self.proto, self.asn.unwrap_or_default(), self.asn_org.unwrap_or_default(), self.city.unwrap_or_default(), self.country_code.unwrap_or_default(), self.country.unwrap_or_default(), self.lat.unwrap_or_default(), self.lng.unwrap_or_default(), ) } } #[tokio::main] async fn main() { env_logger::init(); let args: config::Args = config::Args::parse(); let config = match config::Config::load(&args) { Ok(v) => v, Err(e) => { error!("Failed to load config file: {}", e); std::process::exit(1); } }; if !config.log_file.exists() { error!("Log file '{}' does not exist.", config.log_file.display()); std::process::exit(1); } let open_log_file_or_exit = || match open_log_file(&config.log_file) { Ok(v) => v, Err(e) => { error!( "Failed to watch log file '{}': {:?}", config.log_file.display(), e ); std::process::exit(1); } }; let open_geoip_reader = |v: Option<&std::path::PathBuf>| { v.map(|v| match maxminddb::Reader::open_mmap(v) { Ok(db) => db, Err(e) => { error!("Failed to load GeoIP DB '{}': {}", v.display(), e); std::process::exit(1); } }) }; let asn_db = open_geoip_reader(config.geoip.asn_db.as_ref()); let city_db = open_geoip_reader(config.geoip.city_db.as_ref()); let country_db = open_geoip_reader(config.geoip.country_db.as_ref()); let mut snappy = SnappyEncoder::new(); let mut encode_buf = BytesMut::new(); let mut compress_buf = BytesMut::new(); let mut reader = open_log_file_or_exit(); let mut buf = String::new(); let client = reqwest::Client::new(); let mut entries: Vec<_> = Vec::with_capacity(8); loop { buf.clear(); let read_bytes = match reader.reader.read_line(&mut buf) { Ok(v) => v, Err(e) => { error!("Failed to read line from file: {:?}", e); std::thread::sleep(Duration::new(1, 0)); continue; } }; if read_bytes == 0 { // block until we receive a notification from inotify match reader.recv.recv() { Ok(ModifyType::Data) => {} Ok(ModifyType::Rotate) => reader = open_log_file_or_exit(), // crossbeam channel disconnected, wait a second before polling file again Err(_) => std::thread::sleep(Duration::new(1, 0)), } continue; } let (_, log_line) = match parser::parse_log_line(&buf) { Ok((_, v)) if !config.firewall.rules.iter().any(|rule| v.rule == rule) => { debug!("Non-matching firewall rule: {}", v.rule); continue; } Err(e) => { debug!("Non-matching log line: {:?}", e); continue; } Ok(v) => v, }; if let Some(ip) = log_line.values.get("SRC") { let ip = if let Ok(ip) = ip.parse() { ip } else { error!("Malformed src ip in iptables logs {}", ip); continue; }; let asn: Option = asn_db.as_ref().and_then(|db| db.lookup(ip).ok()); let city: Option = city_db.as_ref().and_then(|db| db.lookup(ip).ok()); let country: Option = country_db.as_ref().and_then(|db| db.lookup(ip).ok()); let timestamp = log_line.time.timestamp(); let entry = match FirewallEntry::from(&log_line, city.as_ref(), country.as_ref(), asn.as_ref()) { Ok(v) => v.to_string(), Err(e) => { error!("Failed to build firewall entry for log: {:?}", e); continue; } }; entries.push((timestamp, entry)); // once the vec reaches capacity, flush to loki if entries.len() == entries.capacity() { let req = match crate::loki::create_push_request( &mut snappy, &mut encode_buf, &mut compress_buf, &mut entries, ) { Ok(v) => v, Err(e) => { error!("Error creating a Loki push request: {:?}", e); continue; } }; match client.post(&config.loki.push_url).body(req).send().await { Ok(resp) => { if !resp.status().is_success() { error!( "Error pushing log to Loki ({}): {:?}", resp.status(), resp.text().await ); } } Err(e) => error!("Error pushing log to Loki: {:?}", e), } } } } }