Fix clean shutdowns with long-running clients
Diff
src/audit.rs | 14 ++++++++++++--
src/main.rs | 24 +++++++++++++++++++++---
2 files changed, 28 insertions(+), 10 deletions(-)
@@ -11,7 +11,7 @@
use tokio::{
fs::OpenOptions,
io::{AsyncWriteExt, BufWriter},
sync::watch,
sync::{oneshot, watch},
task::JoinHandle,
};
use tracing::{debug, info};
@@ -20,6 +20,7 @@
pub fn start_audit_writer(
config: Arc<Config>,
mut reload: watch::Receiver<()>,
mut shutdown_recv: oneshot::Receiver<()>,
) -> (
tokio::sync::mpsc::UnboundedSender<AuditLog>,
JoinHandle<Result<(), std::io::Error>>,
@@ -39,9 +40,9 @@
let mut writer = open_writer().await?;
let mut shutdown = false;
loop {
while !shutdown {
tokio::select! {
log = recv.recv(), if !shutdown => {
log = recv.recv() => {
match log {
Some(log) => {
let log = serde_json::to_vec(&log)
@@ -54,11 +55,14 @@
}
}
}
_ = tokio::time::sleep(Duration::from_secs(5)), if !writer.buffer().is_empty() && !shutdown => {
_ = &mut shutdown_recv => {
shutdown = true;
}
_ = tokio::time::sleep(Duration::from_secs(5)), if !writer.buffer().is_empty() => {
debug!("Flushing audits to disk");
writer.flush().await?;
}
Ok(()) = reload.changed(), if !shutdown => {
Ok(()) = reload.changed() => {
info!("Flushing audits to disk");
writer.flush().await?;
@@ -6,7 +6,10 @@
use futures::FutureExt;
use std::sync::Arc;
use thrussh::MethodSet;
use tokio::{signal::unix::SignalKind, sync::watch};
use tokio::{
signal::unix::SignalKind,
sync::{oneshot, watch},
};
use tracing::{error, info};
use tracing_subscriber::EnvFilter;
@@ -49,29 +52,40 @@
});
let (reload_send, reload_recv) = watch::channel(());
let (shutdown_send, shutdown_recv) = oneshot::channel();
let (audit_send, audit_handle) = audit::start_audit_writer(args.config.clone(), reload_recv);
let (audit_send, audit_handle) =
audit::start_audit_writer(args.config.clone(), reload_recv, shutdown_recv);
let mut audit_handle = audit_handle.fuse();
let server = Server::new(args.config.clone(), audit_send);
let listen_address = args.config.listen_address.to_string();
let fut = thrussh::server::run(thrussh_config, &listen_address, server);
let shutdown_watcher = watch_for_shutdown(shutdown_send);
let reload_watcher = watch_for_reloads(reload_send);
tokio::select! {
res = fut => res?,
res = &mut audit_handle => res??,
res = shutdown_watcher => res?,
res = reload_watcher => res?,
_ = tokio::signal::ctrl_c() => {
info!("Received ctrl-c, initiating shutdown");
}
}
info!("Finishing audit log writes");
audit_handle.await??;
info!("Audit log writes finished");
Ok(())
}
async fn watch_for_shutdown(send: oneshot::Sender<()>) -> Result<(), anyhow::Error> {
tokio::signal::ctrl_c().await?;
info!("Received ctrl-c, initiating shutdown");
let _res = send.send(());
Ok(())
}