Allow configurable indexing intervals
This change allows refresh intervals to be changed from the fixed 5
minutes that was set prior to this change. Automatic indexing can also
be disabled entirely and instead triggered via a SIGHUP.
Fixes #21
Diff
Cargo.lock | 7 +++++++
Cargo.toml | 1 +
src/main.rs | 123 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------
3 files changed, 110 insertions(+), 21 deletions(-)
@@ -1106,6 +1106,12 @@
]
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
version = "0.14.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1845,6 +1851,7 @@
"git2",
"hex",
"httparse",
"humantime",
"md5",
"moka",
"nom",
@@ -21,6 +21,7 @@
futures = "0.3"
git2 = "0.17.2"
hex = "0.4"
humantime = "2.1"
nom = "7.1"
md5 = "0.7"
moka = { version = "0.11.1", features = ["future"] }
@@ -1,8 +1,13 @@
#![deny(clippy::pedantic)]
use std::net::SocketAddr;
use std::path::PathBuf;
use std::{sync::Arc, time::Duration};
use std::{
fmt::{Display, Formatter},
net::SocketAddr,
path::PathBuf,
str::FromStr,
sync::Arc,
time::Duration,
};
use askama::Template;
use axum::{
@@ -15,10 +20,15 @@
};
use bat::assets::HighlightingAssets;
use clap::Parser;
use sled::Db;
use syntect::html::ClassStyle;
use tokio::{
signal::unix::{signal, SignalKind},
sync::mpsc,
};
use tower_http::cors::CorsLayer;
use tower_layer::layer_fn;
use tracing::{info, instrument};
use tracing::{error, info, instrument};
use crate::{git::Git, layers::logger::LoggingMiddleware};
@@ -42,10 +52,42 @@
bind_address: SocketAddr,
scan_path: PathBuf,
#[clap(long, default_value_t = RefreshInterval::Duration(Duration::from_secs(300)))]
refresh_interval: RefreshInterval,
}
#[derive(Debug, Clone, Copy)]
pub enum RefreshInterval {
Never,
Duration(Duration),
}
impl Display for RefreshInterval {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Never => write!(f, "never"),
Self::Duration(s) => write!(f, "{}", humantime::format_duration(*s)),
}
}
}
impl FromStr for RefreshInterval {
type Err = &'static str;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s == "never" {
Ok(Self::Never)
} else if let Ok(v) = humantime::parse_duration(s) {
Ok(Self::Duration(v))
} else {
Err("must be seconds, a human readable duration (eg. '10m') or 'never'")
}
}
}
#[tokio::main]
async fn main() {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Args = Args::parse();
let subscriber = tracing_subscriber::fmt();
@@ -58,20 +100,10 @@
.path(&args.db_store)
.open()
.unwrap();
std::thread::spawn({
let db = db.clone();
let scan_path = args.scan_path.clone();
move || loop {
info!("Running periodic index");
crate::database::indexer::run(&scan_path, &db);
info!("Finished periodic index");
let indexer_wakeup_task =
run_indexer(db.clone(), args.scan_path.clone(), args.refresh_interval);
std::thread::sleep(Duration::from_secs(300));
}
});
let bat_assets = HighlightingAssets::from_binary();
let syntax_set = bat_assets.get_syntax_set().unwrap().clone();
@@ -136,10 +168,59 @@
.layer(Extension(Arc::new(args.scan_path)))
.layer(CorsLayer::new());
axum::Server::bind(&args.bind_address)
.serve(app.into_make_service_with_connect_info::<std::net::SocketAddr>())
.await
.unwrap();
let server = axum::Server::bind(&args.bind_address)
.serve(app.into_make_service_with_connect_info::<SocketAddr>());
tokio::select! {
res = server => res.map_err(Box::from),
res = indexer_wakeup_task => res.map_err(Box::from),
_ = tokio::signal::ctrl_c() => {
info!("Received ctrl-c, shutting down");
Ok(())
}
}
}
async fn run_indexer(
db: Db,
scan_path: PathBuf,
refresh_interval: RefreshInterval,
) -> Result<(), tokio::task::JoinError> {
let (indexer_wakeup_send, mut indexer_wakeup_recv) = mpsc::channel(10);
std::thread::spawn(move || loop {
info!("Running periodic index");
crate::database::indexer::run(&scan_path, &db);
info!("Finished periodic index");
if indexer_wakeup_recv.blocking_recv().is_none() {
break;
}
});
tokio::spawn({
let mut sighup = signal(SignalKind::hangup()).expect("could not subscribe to sighup");
let build_sleeper = move || async move {
match refresh_interval {
RefreshInterval::Never => futures::future::pending().await,
RefreshInterval::Duration(v) => tokio::time::sleep(v).await,
};
};
async move {
loop {
tokio::select! {
_ = sighup.recv() => {},
_ = build_sleeper() => {},
}
if indexer_wakeup_send.send(()).await.is_err() {
error!("Indexing thread has died and is no longer accepting wakeup messages");
}
}
}
})
.await
}
#[instrument(skip(t))]