🏡 index : ~doyle/pisshoff.git

use std::{io::ErrorKind, sync::Arc, time::Duration};

pub use pisshoff_types::audit::*;
use tokio::{
    fs::OpenOptions,
    io::{AsyncWriteExt, BufWriter},
    sync::{oneshot, watch},
    task::JoinHandle,
};
use tracing::{debug, info};

use crate::config::Config;

pub fn start_audit_writer(
    config: Arc<Config>,
    mut reload: watch::Receiver<()>,
    mut shutdown_recv: oneshot::Receiver<()>,
) -> (
    tokio::sync::mpsc::UnboundedSender<AuditLog>,
    JoinHandle<Result<(), std::io::Error>>,
) {
    let (send, mut recv) = tokio::sync::mpsc::unbounded_channel();

    let handle = tokio::spawn(async move {
        let open_writer = || async {
            let file = OpenOptions::default()
                .create(true)
                .append(true)
                .open(&config.audit_output_file)
                .await?;
            Ok::<_, std::io::Error>(BufWriter::new(file))
        };

        let mut writer = open_writer().await?;
        let mut shutdown = false;

        while !shutdown {
            tokio::select! {
                log = recv.recv() => {
                    match log {
                        Some(log) => {
                            let log = serde_json::to_vec(&log)
                                .map_err(|e| std::io::Error::new(ErrorKind::Other, e))?;
                            writer.write_all(&log).await?;
                            writer.write_all("\n".as_bytes()).await?;
                        }
                        None => {
                            shutdown = true;
                        }
                    }
                }
                _ = &mut shutdown_recv => {
                    shutdown = true;
                }
                () = tokio::time::sleep(Duration::from_secs(5)), if !writer.buffer().is_empty() => {
                    debug!("Flushing audits to disk");
                    writer.flush().await?;
                }
                Ok(()) = reload.changed() => {
                    info!("Flushing audits to disk");
                    writer.flush().await?;

                    info!("Reopening handle to log file");
                    writer = open_writer().await?;

                    info!("Successfully re-opened log file");
                }
                else => break,
            }
        }

        writer.flush().await?;

        Ok(())
    });

    (send, handle)
}