use std::{io::ErrorKind, sync::Arc, time::Duration}; pub use pisshoff_types::audit::*; use tokio::{ fs::OpenOptions, io::{AsyncWriteExt, BufWriter}, sync::{oneshot, watch}, task::JoinHandle, }; use tracing::{debug, info}; use crate::config::Config; pub fn start_audit_writer( config: Arc, mut reload: watch::Receiver<()>, mut shutdown_recv: oneshot::Receiver<()>, ) -> ( tokio::sync::mpsc::UnboundedSender, JoinHandle>, ) { let (send, mut recv) = tokio::sync::mpsc::unbounded_channel(); let handle = tokio::spawn(async move { let open_writer = || async { let file = OpenOptions::default() .create(true) .append(true) .open(&config.audit_output_file) .await?; Ok::<_, std::io::Error>(BufWriter::new(file)) }; let mut writer = open_writer().await?; let mut shutdown = false; while !shutdown { tokio::select! { log = recv.recv() => { match log { Some(log) => { let log = serde_json::to_vec(&log) .map_err(|e| std::io::Error::new(ErrorKind::Other, e))?; writer.write_all(&log).await?; writer.write_all("\n".as_bytes()).await?; } None => { shutdown = true; } } } _ = &mut shutdown_recv => { shutdown = true; } () = tokio::time::sleep(Duration::from_secs(5)), if !writer.buffer().is_empty() => { debug!("Flushing audits to disk"); writer.flush().await?; } Ok(()) = reload.changed() => { info!("Flushing audits to disk"); writer.flush().await?; info!("Reopening handle to log file"); writer = open_writer().await?; info!("Successfully re-opened log file"); } else => break, } } writer.flush().await?; Ok(()) }); (send, handle) }