Store eligibility and metadata caches in rocksdb
Also renames the checksum cache to eligibility cache since it also now
supports yanking. There will be a follow-up change to add a `bust-cache`
SSH command to replace the delayed caching that was previously in place.
Diff
CHANGELOG.md | 4 +++-
Cargo.lock | 266 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Cargo.toml | 6 ++++++
src/cache.rs | 311 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/config.rs | 11 +++++++++++
src/main.rs | 111 ++++++++++++++++++++++++++++++++++++++++++++++++++++----------------------------
src/metadata.rs | 59 +++++++++++++++++++++++++++++++++++++++++++----------------
src/providers/gitlab.rs | 172 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------
src/providers/mod.rs | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++------
src/providers/gitlab/checksums.rs | 40 ----------------------------------------
10 files changed, 828 insertions(+), 208 deletions(-)
@@ -6,7 +6,9 @@
- Add info logs for release & metadata fetch latency.
- When fetching all releases handle 429 by backing off.
- Improve fetch error logging.
- Add file checksum fetch caching controlled by `cache-releases-older-than` config.
- Added crate eligibility cache.
- Introduce configurable cache backend with a RocksDB implementation (set `cache.type = "rocksdb"` and `cache.path = "cache"` to use it), defaults to `cache.type = "in-memory"`.
- Support crate yanking by creating a `yanked` file on the release.
# v0.1.4
@@ -179,6 +179,45 @@
]
[[package]]
name = "bincode"
version = "2.0.0-rc.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f11ea1a0346b94ef188834a65c068a03aec181c94896d481d7a0a40d85b0ce95"
dependencies = [
"bincode_derive",
"serde",
]
[[package]]
name = "bincode_derive"
version = "2.0.0-rc.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e30759b3b99a1b802a7a3aa21c85c3ded5c28e1c83170d82d70f08bbf7f3e4c"
dependencies = [
"virtue",
]
[[package]]
name = "bindgen"
version = "0.69.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a00dc851838a2120612785d195287475a3ac45514741da670b735818822129a0"
dependencies = [
"bitflags 2.4.2",
"cexpr",
"clang-sys",
"itertools",
"lazy_static",
"lazycell",
"proc-macro2",
"quote",
"regex",
"rustc-hash",
"shlex",
"syn",
]
[[package]]
name = "bit-vec"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -257,6 +296,17 @@
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
[[package]]
name = "bzip2-sys"
version = "0.1.11+1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc"
dependencies = [
"cc",
"libc",
"pkg-config",
]
[[package]]
name = "camino"
@@ -297,6 +347,15 @@
checksum = "7f9fa1897e4325be0d68d48df6aa1a71ac2ed4d27723887e7754192705350730"
dependencies = [
"libc",
]
[[package]]
name = "cexpr"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
dependencies = [
"nom",
]
[[package]]
@@ -322,6 +381,17 @@
dependencies = [
"crypto-common",
"inout",
]
[[package]]
name = "clang-sys"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67523a3b4be3ce1989d607a828d036249522dd9c1c8de7f4dd2dae43a37369d1"
dependencies = [
"glob",
"libc",
"libloading",
]
[[package]]
@@ -499,6 +569,12 @@
"redox_users",
"winapi",
]
[[package]]
name = "either"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a"
[[package]]
name = "encoding_rs"
@@ -526,6 +602,12 @@
]
[[package]]
name = "fastrand"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5"
[[package]]
name = "flate2"
version = "1.0.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -674,6 +756,7 @@
"arrayvec",
"async-trait",
"backoff",
"bincode",
"bytes",
"cargo-platform",
"cargo_metadata",
@@ -689,11 +772,13 @@
"parse_link_header",
"percent-encoding",
"reqwest",
"rocksdb",
"semver",
"serde",
"serde_json",
"shlex",
"smol_str",
"tempfile",
"thrussh",
"thrussh-keys",
"thrussh-libsodium",
@@ -707,8 +792,15 @@
"urlencoding",
"ustr",
"uuid",
"yoke",
"zstd",
]
[[package]]
name = "glob"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "h2"
@@ -900,6 +992,15 @@
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3"
[[package]]
name = "itertools"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569"
dependencies = [
"either",
]
[[package]]
name = "itoa"
@@ -921,12 +1022,28 @@
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "lazycell"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "libc"
version = "0.2.153"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd"
[[package]]
name = "libloading"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c571b676ddfc9a8c12f1f3d3085a7b163966a8fd8098a90640953ce5f6170161"
dependencies = [
"cfg-if",
"windows-sys 0.48.0",
]
[[package]]
name = "libredox"
@@ -940,6 +1057,22 @@
]
[[package]]
name = "librocksdb-sys"
version = "0.16.0+8.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce3d60bc059831dc1c83903fb45c103f75db65c5a7bf22272764d9cc683e348c"
dependencies = [
"bindgen",
"bzip2-sys",
"cc",
"glob",
"libc",
"libz-sys",
"lz4-sys",
"zstd-sys",
]
[[package]]
name = "libsodium-sys"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -949,6 +1082,17 @@
"libc",
"pkg-config",
"walkdir",
]
[[package]]
name = "libz-sys"
version = "1.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "037731f5d3aaa87a5675e895b63ddff1a87624bc29f77004ea829809654e48f6"
dependencies = [
"cc",
"pkg-config",
"vcpkg",
]
[[package]]
@@ -972,6 +1116,16 @@
version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "lz4-sys"
version = "1.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "matchers"
@@ -999,6 +1153,12 @@
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
@@ -1018,6 +1178,16 @@
"libc",
"wasi",
"windows-sys 0.48.0",
]
[[package]]
name = "nom"
version = "7.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
dependencies = [
"memchr",
"minimal-lexical",
]
[[package]]
@@ -1389,6 +1559,16 @@
"spin",
"untrusted",
"windows-sys 0.52.0",
]
[[package]]
name = "rocksdb"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bd13e55d6d7b8cd0ea569161127567cd587676c99f4472f779a0279aa60a7a7"
dependencies = [
"libc",
"librocksdb-sys",
]
[[package]]
@@ -1396,6 +1576,12 @@
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]]
name = "rustc-hash"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustix"
@@ -1624,6 +1810,12 @@
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "strsim"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1651,6 +1843,17 @@
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "synstructure"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "system-configuration"
@@ -1671,6 +1874,18 @@
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "tempfile"
version = "3.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a365e8cd18e44762ef95d87f284f4b5cd04107fec2ff3052bd6a3e6069669e67"
dependencies = [
"cfg-if",
"fastrand",
"rustix",
"windows-sys 0.52.0",
]
[[package]]
@@ -2067,6 +2282,12 @@
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "virtue"
version = "0.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9dcc60c0624df774c82a0ef104151231d37da4962957d691c011c852b2473314"
[[package]]
name = "walkdir"
@@ -2356,6 +2577,30 @@
dependencies = [
"bit-vec",
"num-bigint",
]
[[package]]
name = "yoke"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65e71b2e4f287f467794c671e2b8f8a5f3716b3c829079a1c44740148eff07e4"
dependencies = [
"serde",
"stable_deref_trait",
"yoke-derive",
"zerofrom",
]
[[package]]
name = "yoke-derive"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e6936f0cce458098a201c245a11bef556c6a0181129c7034d10d76d1ec3a2b8"
dependencies = [
"proc-macro2",
"quote",
"syn",
"synstructure",
]
[[package]]
@@ -2372,10 +2617,31 @@
version = "0.7.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "zerofrom"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "655b0814c5c0b19ade497851070c640773304939a6c0fd5f5fb43da0696d05b7"
dependencies = [
"zerofrom-derive",
]
[[package]]
name = "zerofrom-derive"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6a647510471d372f2e6c2e6b7219e44d8c574d24fdc11c610a61455782f18c3"
dependencies = [
"proc-macro2",
"quote",
"syn",
"synstructure",
]
[[package]]
@@ -9,6 +9,7 @@
arrayvec = "0.7"
async-trait = "0.1"
backoff = "0.4"
bincode = { version = "2.0.0-rc.3", features = ["serde"] }
bytes = "1.1"
cargo_metadata = "0.15"
cargo-platform = "0.1"
@@ -24,6 +25,7 @@
parse_link_header = "0.3"
percent-encoding = "2.3"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
rocksdb = "0.22"
semver = "1.0"
serde = { version = "1.0", features = ["derive", "rc"] }
serde_json = "1"
@@ -42,7 +44,11 @@
urlencoding = "2.1"
ustr = "0.10"
uuid = { version = "1.1", features = ["v4"] }
yoke = { version = "0.7", features = ["derive"] }
zstd = "0.13"
[dev-dependencies]
tempfile = "3.10"
[profile.release]
lto = "thin"
@@ -1,0 +1,311 @@
use std::collections::HashMap;
use std::{
io::{Error, ErrorKind},
path::Path,
sync::Arc,
};
use async_trait::async_trait;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use yoke::{Yoke, Yokeable};
use crate::config::{CacheStore, Config};
pub const BINCODE_CONFIG: bincode::config::Configuration = bincode::config::standard();
pub trait Cacheable: Serialize + Send + for<'a> Yokeable<'a> {
type Key<'a>: Send + 'a;
const KIND: CacheKind;
fn build_key(k: Self::Key<'_>) -> Vec<u8> {
let mut key = Vec::new();
key.push(Self::KIND as u8);
Self::format_key(&mut key, k);
key
}
fn format_key(out: &mut Vec<u8>, k: Self::Key<'_>);
}
#[repr(u8)]
pub enum CacheKind {
Eligibility = 1,
CrateMetadata = 2,
}
#[derive(Clone)]
pub enum ConcreteCache {
RocksDb(RocksDb),
InMemory(InMemory),
}
impl ConcreteCache {
pub fn new(config: &Config) -> Result<Self, Error> {
Ok(match &config.cache {
CacheStore::RocksDb { path } => Self::RocksDb(RocksDb::new(path)?),
CacheStore::InMemory => Self::InMemory(InMemory::default()),
})
}
}
#[async_trait]
impl Cache for ConcreteCache {
async fn put<C: Cacheable + Sync>(&self, key: C::Key<'_>, value: &C) -> Result<(), Error> {
match self {
Self::RocksDb(r) => r.put(key, value).await,
Self::InMemory(i) => i.put(key, value).await,
}
}
async fn get<C: Cacheable + 'static>(
&self,
key: C::Key<'_>,
) -> Result<Option<Yoke<C, Vec<u8>>>, Error>
where
for<'a> <C as Yokeable<'a>>::Output: Deserialize<'a>,
{
match self {
Self::RocksDb(r) => r.get(key).await,
Self::InMemory(i) => i.get(key).await,
}
}
async fn remove<C: Cacheable>(&self, key: C::Key<'_>) -> Result<(), Error> {
match self {
Self::RocksDb(r) => r.remove::<C>(key).await,
Self::InMemory(i) => i.remove::<C>(key).await,
}
}
}
#[async_trait]
pub trait Cache {
async fn put<C: Cacheable + Sync>(&self, key: C::Key<'_>, value: &C) -> Result<(), Error>;
async fn get<C: Cacheable + 'static>(
&self,
key: C::Key<'_>,
) -> Result<Option<Yoke<C, Vec<u8>>>, Error>
where
for<'a> <C as Yokeable<'a>>::Output: Deserialize<'a>;
async fn remove<C: Cacheable>(&self, key: C::Key<'_>) -> Result<(), Error>;
}
#[derive(Clone, Default)]
#[allow(clippy::type_complexity)]
pub struct InMemory {
db: Arc<RwLock<HashMap<Box<[u8]>, Box<[u8]>>>>,
}
#[async_trait]
impl Cache for InMemory {
async fn put<C: Cacheable + Sync>(&self, key: C::Key<'_>, value: &C) -> Result<(), Error> {
let serialized = bincode::serde::encode_to_vec(value, BINCODE_CONFIG)
.map_err(|e| Error::new(ErrorKind::Other, e))?;
let key = C::build_key(key);
self.db
.write()
.insert(key.into_boxed_slice(), serialized.into_boxed_slice());
Ok(())
}
async fn get<C: Cacheable + 'static>(
&self,
key: C::Key<'_>,
) -> Result<Option<Yoke<C, Vec<u8>>>, Error>
where
for<'a> <C as Yokeable<'a>>::Output: Deserialize<'a>,
{
let key = C::build_key(key);
let Some(value) = self.db.read().get(key.as_slice()).map(|v| v.to_vec()) else {
return Ok(None);
};
Yoke::try_attach_to_cart(value, |v| {
bincode::serde::decode_borrowed_from_slice(v, BINCODE_CONFIG)
})
.map(Some)
.map_err(|e| Error::new(ErrorKind::Other, e))
}
async fn remove<C: Cacheable>(&self, key: C::Key<'_>) -> Result<(), Error> {
self.db.write().remove(C::build_key(key).as_slice());
Ok(())
}
}
#[derive(Clone)]
pub struct RocksDb {
rocks: Arc<rocksdb::DB>,
}
impl RocksDb {
pub fn new(path: &Path) -> Result<Self, Error> {
let rocks = rocksdb::DB::open_default(path).map_err(|e| Error::new(ErrorKind::Other, e))?;
Ok(Self {
rocks: Arc::new(rocks),
})
}
}
#[async_trait]
impl Cache for RocksDb {
async fn put<C: Cacheable + Sync>(&self, key: C::Key<'_>, value: &C) -> Result<(), Error> {
let serialized = bincode::serde::encode_to_vec(value, BINCODE_CONFIG)
.map_err(|e| Error::new(ErrorKind::Other, e))?;
let rocks = self.rocks.clone();
let key = C::build_key(key);
tokio::task::spawn_blocking(move || {
rocks
.put(key, serialized)
.map_err(|e| Error::new(ErrorKind::Other, e))
})
.await
.map_err(|e| Error::new(ErrorKind::Other, e))?
}
async fn get<C: Cacheable + 'static>(
&self,
key: C::Key<'_>,
) -> Result<Option<Yoke<C, Vec<u8>>>, Error>
where
for<'a> <C as Yokeable<'a>>::Output: Deserialize<'a>,
{
let rocks = self.rocks.clone();
let key = C::build_key(key);
tokio::task::spawn_blocking(move || {
rocks
.get(key)
.map_err(|e| Error::new(ErrorKind::Other, e))?
.map(|v| {
Yoke::try_attach_to_cart(v, |v| {
bincode::serde::decode_borrowed_from_slice(v, BINCODE_CONFIG)
})
})
.transpose()
.map_err(|e| Error::new(ErrorKind::Other, e))
})
.await
.map_err(|e| Error::new(ErrorKind::Other, e))?
}
async fn remove<C: Cacheable>(&self, key: C::Key<'_>) -> Result<(), Error> {
let rocks = self.rocks.clone();
let key = C::build_key(key);
tokio::task::spawn_blocking(move || {
rocks
.delete(key)
.map_err(|e| Error::new(ErrorKind::Other, e))
})
.await
.map_err(|e| Error::new(ErrorKind::Other, e))?
}
}
pub type Yoked<T> = Yoke<T, Vec<u8>>;
#[cfg(test)]
mod test {
use crate::cache::{Cache, InMemory, RocksDb};
use crate::providers::{EligibilityCacheKey, Release};
use std::borrow::Cow;
use tempfile::tempdir;
async fn test_suite<T: Cache>(cache: T) {
let out = cache
.get::<Option<Release<'static>>>(EligibilityCacheKey::new(
"my-project",
"my-crate",
"my-crate-version",
))
.await
.unwrap();
assert!(out.is_none());
cache
.put(
EligibilityCacheKey::new("my-project", "my-crate", "my-crate-version"),
&None,
)
.await
.unwrap();
let out = cache
.get::<Option<Release<'static>>>(EligibilityCacheKey::new(
"my-project",
"my-crate",
"my-crate-version",
))
.await
.unwrap();
assert!(out.unwrap().get().is_none());
cache
.put(
EligibilityCacheKey::new("my-project", "my-crate", "my-crate-version"),
&Some(Release {
name: Cow::Borrowed("helloworld"),
version: Cow::Borrowed("1.0.0"),
checksum: Cow::Borrowed("123456"),
project: Cow::Borrowed("test"),
yanked: false,
}),
)
.await
.unwrap();
let out = cache
.get::<Option<Release<'static>>>(EligibilityCacheKey::new(
"my-project",
"my-crate",
"my-crate-version",
))
.await
.unwrap();
assert_eq!(
out.unwrap().get().as_ref().unwrap().name.as_ref(),
"helloworld"
);
let out = cache
.get::<Option<Release<'static>>>(EligibilityCacheKey::new(
"my-project",
"my-crate",
"my-crate-version-2",
))
.await
.unwrap();
assert!(out.is_none());
}
#[tokio::test]
async fn rocksdb() {
let temp_dir = tempdir().unwrap();
let cache = RocksDb::new(temp_dir.path()).unwrap();
test_suite(cache).await;
}
#[tokio::test]
async fn in_memory() {
let cache = InMemory::default();
test_suite(cache).await;
}
}
@@ -19,6 +19,17 @@
pub listen_address: SocketAddr,
pub state_directory: PathBuf,
pub gitlab: GitlabConfig,
#[serde(default)]
pub cache: CacheStore,
}
#[derive(Deserialize, Clone, Default)]
#[serde(rename_all = "kebab-case", tag = "type")]
pub enum CacheStore {
#[serde(rename = "rocksdb")]
RocksDb { path: PathBuf },
#[default]
InMemory,
}
impl FromStr for Config {
@@ -1,16 +1,22 @@
#![deny(clippy::pedantic)]
#![allow(clippy::missing_errors_doc, clippy::blocks_in_conditions)]
#![allow(
clippy::missing_errors_doc,
clippy::blocks_in_conditions,
clippy::module_name_repetitions
)]
pub mod cache;
pub mod config;
pub mod git_command_handlers;
pub mod metadata;
pub mod providers;
pub mod util;
use crate::cache::{Cache, ConcreteCache, Yoked};
use crate::{
config::Args,
metadata::{CargoConfig, CargoIndexCrateMetadata},
providers::{gitlab::Gitlab, PackageProvider, Release, ReleaseName, User, UserProvider},
providers::{gitlab::Gitlab, PackageProvider, Release, User, UserProvider},
util::get_crate_folder,
};
use anyhow::anyhow;
@@ -24,10 +30,8 @@
low_level::{HashOutput, PackFileEntry},
PktLine,
};
use parking_lot::RwLock;
use std::{
borrow::Cow,
collections::HashMap,
fmt::Write,
net::{SocketAddr, SocketAddrV6},
pin::Pin,
@@ -42,8 +46,9 @@
use thrussh_keys::key::PublicKey;
use tokio::sync::Semaphore;
use tokio_util::codec::{Decoder, Encoder as CodecEncoder};
use tracing::{error, info, info_span, instrument, trace, Instrument, Span};
use tracing::{debug, error, info, info_span, instrument, trace, Instrument, Span};
use uuid::Uuid;
use yoke::Yoke;
const AGENT: &str = concat!(
"agent=",
@@ -107,28 +112,25 @@
keys: vec![key],
..thrussh::server::Config::default()
});
let cache = ConcreteCache::new(&args.config)?;
let gitlab = Arc::new(Gitlab::new(&args.config.gitlab)?);
let gitlab = Arc::new(Gitlab::new(&args.config.gitlab, cache.clone())?);
thrussh::server::run(
thrussh_config,
&args.config.listen_address.to_string(),
Server {
gitlab,
metadata_cache: MetadataCache::default(),
},
Server { gitlab, cache },
)
.await?;
Ok(())
}
type MetadataCache = Arc<RwLock<HashMap<MetadataCacheKey<'static>, Arc<CargoIndexCrateMetadata>>>>;
struct Server<U: UserProvider + PackageProvider + Send + Sync + 'static> {
gitlab: Arc<U>,
metadata_cache: MetadataCache,
cache: ConcreteCache,
}
impl<U: UserProvider + PackageProvider + Send + Sync + 'static> thrussh::server::Server
@@ -152,7 +154,7 @@
input_bytes: BytesMut::new(),
output_bytes: BytesMut::new(),
is_git_protocol_v2: false,
metadata_cache: Arc::clone(&self.metadata_cache),
cache: self.cache.clone(),
span,
packfile_cache: None,
}
@@ -168,7 +170,7 @@
input_bytes: BytesMut,
output_bytes: BytesMut,
is_git_protocol_v2: bool,
metadata_cache: MetadataCache,
cache: ConcreteCache,
span: Span,
@@ -208,19 +210,21 @@
#[allow(clippy::type_complexity)]
async fn fetch_releases_by_crate(
&self,
) -> anyhow::Result<IndexMap<(U::CratePath, ReleaseName), Vec<Release>>> {
) -> anyhow::Result<IndexMap<Arc<str>, Vec<Yoked<Release<'static>>>>> {
let user = self.user()?;
let project = self.project()?;
let mut res = IndexMap::new();
let mut res = IndexMap::<Arc<str>, Vec<Yoked<Release<'static>>>>::new();
for (path, release) in Arc::clone(&self.gitlab)
for release in Arc::clone(&self.gitlab)
.fetch_releases_for_project(project, user)
.await?
{
res.entry((path, Arc::clone(&release.name)))
.or_insert_with(Vec::new)
.push(release);
if let Some(releases) = res.get_mut(release.get().name.as_ref()) {
releases.push(release);
} else {
res.insert(Arc::from(release.get().name.to_string()), vec![release]);
}
}
Ok(res)
@@ -232,48 +236,39 @@
async fn fetch_metadata(
gitlab: &U,
cache: &MetadataCache,
path: &U::CratePath,
cache: &ConcreteCache,
project: &str,
checksum: &str,
crate_name: &str,
crate_version: &str,
do_as: &Arc<User>,
) -> anyhow::Result<Arc<CargoIndexCrateMetadata>> {
let key = MetadataCacheKey {
checksum: checksum.into(),
crate_name: crate_name.into(),
crate_version: crate_version.into(),
};
) -> anyhow::Result<Yoked<CargoIndexCrateMetadata<'static>>> {
if let Some(cache) = cache
.get::<CargoIndexCrateMetadata<'static>>(checksum)
.await?
{
let reader = cache.read();
if let Some(cache) = reader.get(&key) {
return Ok(Arc::clone(cache));
}
debug!("Using metadata from cache");
return Ok(cache);
}
info!("Fetching metadata from GitLab");
let metadata = gitlab
.fetch_metadata_for_release(path, crate_version, do_as)
.fetch_metadata_for_release(project, crate_name, crate_version, do_as)
.await?;
let cksum = checksum.to_string();
let metadata = metadata::transform(metadata, crate_name, cksum)
.map(Arc::new)
.ok_or_else(|| anyhow!("the supplied metadata.json did contain the released crate"))?;
{
let mut writer = cache.write();
writer.insert(key.into_owned(), Arc::clone(&metadata));
}
cache.put(checksum, &metadata).await?;
Ok(metadata)
Ok(Yoke::attach_to_cart(Vec::new(), move |_| metadata))
}
@@ -326,20 +321,22 @@
let fetch_concurrency = Semaphore::new(PARALLEL_METADATA_FETCHES);
let mut metadata_fetches = FuturesOrdered::new();
let a = Instant::now();
for ((crate_path, crate_name), releases) in &releases_by_crate {
for (crate_name, releases) in &releases_by_crate {
for release in releases {
metadata_fetches.push_back({
let user = Arc::clone(user);
let gitlab = &self.gitlab;
let cache = &self.metadata_cache;
let cache = &self.cache;
let fetch_concurrency = &fetch_concurrency;
let checksum = &release.checksum;
let version = &release.version;
let crate_name = crate_name.as_ref();
let checksum = release.get().checksum.as_ref();
let version = release.get().version.as_ref();
let project = release.get().project.as_ref();
async move {
let _guard = fetch_concurrency.acquire().await?;
trace!("Fetching metadata for {crate_name}-{version}");
Self::fetch_metadata(
gitlab, cache, crate_path, checksum, crate_name, version, &user,
gitlab, cache, project, checksum, crate_name, version, &user,
)
.await
}
@@ -350,7 +347,7 @@
let mut buffer = BytesMut::new().writer();
for ((_, crate_name), releases) in &releases_by_crate {
for (crate_name, releases) in &releases_by_crate {
for release in releases {
match metadata_fetches
.next()
@@ -360,7 +357,7 @@
Ok(meta) => {
serde_json::to_writer(&mut buffer, &*meta)?;
serde_json::to_writer(&mut buffer, meta.get())?;
buffer.get_mut().put_u8(b'\n');
}
@@ -371,7 +368,7 @@
packfile.insert(
&get_crate_folder(crate_name),
Arc::clone(crate_name),
crate_name.to_string(),
buffer.get_mut().split().freeze(),
)?;
}
@@ -644,14 +641,4 @@
checksum: Cow<'a, str>,
crate_name: Cow<'a, str>,
crate_version: Cow<'a, str>,
}
impl MetadataCacheKey<'_> {
pub fn into_owned(self) -> MetadataCacheKey<'static> {
MetadataCacheKey {
checksum: self.checksum.into_owned().into(),
crate_name: self.crate_name.into_owned().into(),
crate_version: self.crate_version.into_owned().into(),
}
}
}
@@ -1,10 +1,12 @@
#![allow(clippy::module_name_repetitions)]
use crate::cache::{CacheKind, Cacheable};
use cargo_metadata::{DependencyKind, Package};
use cargo_platform::Platform;
use semver::{Version, VersionReq};
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, collections::HashMap};
use yoke::Yokeable;
@@ -14,14 +16,14 @@
metadata: cargo_metadata::Metadata,
crate_name: &str,
cksum: String,
) -> Option<CargoIndexCrateMetadata> {
) -> Option<CargoIndexCrateMetadata<'static>> {
let package: Package = metadata
.packages
.into_iter()
.find(|v| v.name == crate_name)?;
Some(CargoIndexCrateMetadata {
name: package.name,
name: Cow::Owned(package.name),
vers: package.version,
deps: package
.dependencies
@@ -34,9 +36,9 @@
};
CargoIndexCrateMetadataDependency {
name,
name: Cow::Owned(name),
req: v.req,
features: v.features,
features: v.features.into_iter().map(Cow::Owned).collect(),
optional: v.optional,
default_features: v.uses_default_features,
target: v.target,
@@ -45,14 +47,18 @@
Cow::Borrowed("https://github.com/rust-lang/crates.io-index.git"),
Cow::Owned,
)),
package,
package: package.map(Cow::Owned),
}
})
.collect(),
cksum,
features: package.features,
features: package
.features
.into_iter()
.map(|(k, v)| (Cow::Owned(k), v.into_iter().map(Cow::Owned).collect()))
.collect(),
yanked: false,
links: package.links,
links: package.links.map(Cow::Owned),
})
}
@@ -61,26 +67,43 @@
pub dl: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct CargoIndexCrateMetadata {
name: String,
#[derive(Serialize, Deserialize, Debug, Yokeable)]
pub struct CargoIndexCrateMetadata<'a> {
#[serde(borrow)]
name: Cow<'a, str>,
vers: Version,
deps: Vec<CargoIndexCrateMetadataDependency>,
#[serde(borrow)]
deps: Vec<CargoIndexCrateMetadataDependency<'a>>,
cksum: String,
features: HashMap<String, Vec<String>>,
#[serde(borrow)]
features: HashMap<Cow<'a, str>, Vec<Cow<'a, str>>>,
yanked: bool,
links: Option<String>,
#[serde(borrow)]
links: Option<Cow<'a, str>>,
}
impl Cacheable for CargoIndexCrateMetadata<'static> {
type Key<'b> = &'b str;
const KIND: CacheKind = CacheKind::CrateMetadata;
fn format_key(out: &mut Vec<u8>, k: Self::Key<'_>) {
out.extend_from_slice(k.as_bytes());
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct CargoIndexCrateMetadataDependency {
name: String,
pub struct CargoIndexCrateMetadataDependency<'a> {
#[serde(borrow)]
name: Cow<'a, str>,
req: VersionReq,
features: Vec<String>,
#[serde(borrow)]
features: Vec<Cow<'a, str>>,
optional: bool,
default_features: bool,
target: Option<Platform>,
kind: DependencyKind,
registry: Option<Cow<'static, str>>,
package: Option<String>,
#[serde(borrow)]
registry: Option<Cow<'a, str>>,
#[serde(borrow)]
package: Option<Cow<'a, str>>,
}
@@ -1,7 +1,7 @@
#![allow(clippy::module_name_repetitions, clippy::blocks_in_conditions)]
mod checksums;
use crate::cache::{Cache, ConcreteCache, Yoked};
use crate::providers::EligibilityCacheKey;
use crate::{
config::{GitlabConfig, MetadataFormat},
providers::{Release, User},
@@ -9,17 +9,18 @@
use anyhow::Context;
use async_trait::async_trait;
use backoff::backoff::Backoff;
use checksums::ChecksumCache;
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
use reqwest::{header, Certificate};
use serde::{Deserialize, Serialize};
use smol_str::{format_smolstr, SmolStr};
use std::{sync::Arc, time::Duration};
use smol_str::SmolStr;
use std::borrow::Cow;
use std::sync::Arc;
use time::OffsetDateTime;
use tokio::sync::Semaphore;
use tracing::{debug, info_span, instrument, Instrument};
use tracing::{debug, info, info_span, instrument, Instrument};
use url::Url;
use yoke::Yoke;
const PARALLEL_PACKAGE_FILES_GETS: usize = 32;
@@ -30,12 +31,11 @@
token_expiry: time::Duration,
metadata_format: MetadataFormat,
admin_token: Option<String>,
checksums: ChecksumCache,
cache_checksums_older_than: Option<Duration>,
cache: ConcreteCache,
}
impl Gitlab {
pub fn new(config: &GitlabConfig) -> anyhow::Result<Self> {
pub fn new(config: &GitlabConfig, cache: ConcreteCache) -> anyhow::Result<Self> {
let mut client_builder = reqwest::ClientBuilder::new();
if let Some(cert_path) = &config.ssl_cert {
@@ -50,23 +50,52 @@
token_expiry: config.token_expiry,
metadata_format: config.metadata_format,
admin_token: config.admin_token.clone(),
checksums: <_>::default(),
cache_checksums_older_than: config.cache_releases_older_than,
cache,
})
}
async fn fetch_checksum(
&self,
key: checksums::Key,
#[instrument(skip_all, err)]
async fn check_release_is_eligible(
self: Arc<Self>,
release: GitlabPackageResponse,
do_as: &User,
) -> anyhow::Result<Option<Arc<str>>> {
if let Some(chksum) = self.checksums.get(&key) {
return Ok(Some(chksum));
) -> anyhow::Result<Yoked<Option<Release<'static>>>> {
let (raw_project, package_id) = {
let mut splitter = release.links.web_path.splitn(2, "/-/packages/");
match (splitter.next(), splitter.next()) {
(Some(project), Some(package)) => (&project[1..], package),
_ => return Ok(Yoke::attach_to_cart(Vec::new(), |_| None)),
}
};
let cache_key = EligibilityCacheKey::new(raw_project, &release.name, &release.version);
if let Some(cached) = self
.cache
.get::<Option<Release<'static>>>(cache_key)
.await
.context("failed to lookup release cache")?
{
debug!("Returning cached eligibility for release");
return Ok(cached);
}
info!("Fetching eligibility for release");
let project = utf8_percent_encode(raw_project, NON_ALPHANUMERIC);
let package_id = utf8_percent_encode(package_id, NON_ALPHANUMERIC);
let uri = self.base_url.join(&format!(
"projects/{project}/packages/{package_id}/package_files",
))?;
let package_files: Vec<GitlabPackageFilesResponse> = handle_error(
self.client
.get(key.fetch_url())
.get(uri)
.user_or_admin_token(do_as, &self.admin_token)
.send_retry_429()
.await?,
@@ -74,24 +103,39 @@
.await?
.json()
.await?;
let Some(file) = package_files
.into_iter()
.find(|package_file| package_file.file_name == key.file_name)
else {
return Ok(None);
};
if let Some(cache_older_than) = self.cache_checksums_older_than {
let cache_max_created = OffsetDateTime::now_utc() - cache_older_than;
if file.created_at < cache_max_created {
self.checksums.set(key, Arc::clone(&file.file_sha256));
}
if !package_files
.iter()
.any(|package_file| package_file.file_name == "metadata.json")
{
return Ok(Yoke::attach_to_cart(Vec::new(), |_| None));
}
let yanked = package_files
.iter()
.any(|package_file| package_file.file_name == "yanked");
let expected_file_name = format!("{}-{}.crate", release.name, release.version);
Ok(Some(file.file_sha256))
let release = package_files
.into_iter()
.find(|package_file| package_file.file_name == expected_file_name)
.map(|package_file| Release {
name: Cow::Owned(release.name.to_string()),
version: Cow::Owned(release.version.clone()),
checksum: Cow::Owned(package_file.file_sha256),
project: Cow::Owned(raw_project.to_string()),
yanked,
});
self.cache
.put(cache_key, &release)
.await
.context("failed to write to cache")?;
Ok(Yoke::attach_to_cart(Vec::new(), |_| release))
}
}
@@ -206,7 +250,7 @@
self: Arc<Self>,
project: &str,
do_as: &Arc<User>,
) -> anyhow::Result<Vec<(Self::CratePath, Release)>> {
) -> anyhow::Result<Vec<Yoked<Release<'static>>>> {
let mut next_uri = Some({
let mut uri = self.base_url.join(&format!(
"projects/{}/packages",
@@ -260,44 +304,9 @@
futures.push(
async move {
let _guard = fetch_concurrency.acquire().await?;
let (project, package) = {
let mut splitter = release.links.web_path.splitn(2, "/-/packages/");
match (splitter.next(), splitter.next()) {
(Some(project), Some(package)) => (&project[1..], package),
_ => return anyhow::Ok(None),
}
};
let package_path = Arc::new(GitlabCratePath {
project: utf8_percent_encode(project, NON_ALPHANUMERIC).to_string(),
package_name: utf8_percent_encode(&release.name, NON_ALPHANUMERIC)
.to_string(),
});
let key = checksums::Key {
base_url: this.base_url.as_str().into(),
project: project.into(),
package: package.into(),
file_name: format_smolstr!(
"{}-{}.crate",
release.name,
release.version
),
};
let checksum = this.fetch_checksum(key, &do_as).await?;
Ok(checksum.map(|checksum| {
(
Arc::clone(&package_path),
Release {
name: Arc::from(release.name),
version: release.version,
checksum,
},
)
}))
this.clone()
.check_release_is_eligible(release, &do_as)
.await
}
.instrument(info_span!("fetch_package_files")),
);
@@ -305,8 +314,8 @@
}
futures
.err_into()
.filter_map(|v| async { v.transpose() })
.map_ok(|v| v.try_map_project(|res, _| res.ok_or(())))
.filter_map(|v| async move { v.map(Result::ok).transpose() })
.try_collect()
.await
}
@@ -314,14 +323,19 @@
#[instrument(skip(self), err)]
async fn fetch_metadata_for_release(
&self,
path: &Self::CratePath,
project: &str,
crate_name: &str,
version: &str,
do_as: &Arc<User>,
) -> anyhow::Result<cargo_metadata::Metadata> {
let fmt = self.metadata_format;
let url = self
.base_url
.join(&path.file_uri(fmt.filename(), version))?;
let url = self.base_url.join(&format!(
"projects/{}/packages/generic/{}/{}/{}",
utf8_percent_encode(project, NON_ALPHANUMERIC),
utf8_percent_encode(crate_name, NON_ALPHANUMERIC),
utf8_percent_encode(version, NON_ALPHANUMERIC),
fmt.filename(),
))?;
fmt.decode(
self.client
@@ -395,7 +409,7 @@
pub file_name: SmolStr,
#[serde(with = "time::serde::rfc3339")]
pub created_at: time::OffsetDateTime,
pub file_sha256: Arc<str>,
pub file_sha256: String,
}
#[derive(Deserialize)]
@@ -1,7 +1,12 @@
pub mod gitlab;
use crate::cache::{CacheKind, Cacheable, Yoked};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::io::Write;
use std::sync::Arc;
use yoke::Yokeable;
#[async_trait]
pub trait UserProvider {
@@ -25,11 +30,12 @@
self: Arc<Self>,
project: &str,
do_as: &Arc<User>,
) -> anyhow::Result<Vec<(Self::CratePath, Release)>>;
) -> anyhow::Result<Vec<Yoked<Release<'static>>>>;
async fn fetch_metadata_for_release(
&self,
path: &Self::CratePath,
project: &str,
crate_name: &str,
version: &str,
do_as: &Arc<User>,
) -> anyhow::Result<cargo_metadata::Metadata>;
@@ -42,13 +48,47 @@
pub id: u64,
pub username: String,
pub token: Option<String>,
}
pub type ReleaseName<'a> = Cow<'a, str>;
#[derive(Debug, Hash, Clone, Copy, PartialEq, Eq)]
pub struct EligibilityCacheKey<'a> {
project: &'a str,
crate_name: &'a str,
crate_version: &'a str,
}
impl<'a> EligibilityCacheKey<'a> {
#[must_use]
pub fn new(project: &'a str, crate_name: &'a str, crate_version: &'a str) -> Self {
Self {
project,
crate_name,
crate_version,
}
}
}
pub type ReleaseName = Arc<str>;
#[derive(Debug, Yokeable, Deserialize, Serialize)]
pub struct Release<'a> {
#[serde(borrow)]
pub name: ReleaseName<'a>,
#[serde(borrow)]
pub version: Cow<'a, str>,
#[serde(borrow)]
pub checksum: Cow<'a, str>,
#[serde(borrow)]
pub project: Cow<'a, str>,
pub yanked: bool,
}
#[derive(Debug)]
pub struct Release {
pub name: ReleaseName,
pub version: String,
pub checksum: Arc<str>,
impl Cacheable for Option<Release<'static>> {
type Key<'b> = EligibilityCacheKey<'b>;
const KIND: CacheKind = CacheKind::Eligibility;
fn format_key(out: &mut Vec<u8>, k: Self::Key<'_>) {
out.reserve(k.project.len() + k.crate_name.len() + k.crate_version.len() + 2);
write!(out, "{}\0{}\0{}", k.project, k.crate_name, k.crate_version).unwrap();
}
}
@@ -1,40 +1,0 @@
use parking_lot::RwLock;
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
use smol_str::SmolStr;
use std::{collections::HashMap, sync::Arc};
#[derive(Debug, Default)]
pub struct ChecksumCache {
checksums: RwLock<HashMap<Key, Arc<str>>>,
}
impl ChecksumCache {
pub fn get(&self, key: &Key) -> Option<Arc<str>> {
self.checksums.read().get(key).cloned()
}
pub fn set(&self, key: Key, checksum: Arc<str>) {
self.checksums.write().insert(key, checksum);
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Key {
pub base_url: SmolStr,
pub project: SmolStr,
pub package: SmolStr,
pub file_name: SmolStr,
}
impl Key {
pub fn fetch_url(&self) -> String {
format!(
"{}/projects/{}/packages/{}/package_files",
self.base_url,
utf8_percent_encode(self.project.as_str(), NON_ALPHANUMERIC),
utf8_percent_encode(self.package.as_str(), NON_ALPHANUMERIC),
)
}
}