From b019baa44a052831150c9c5d04b45a8f015e39d6 Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Sat, 24 Feb 2024 19:18:54 +0000 Subject: [PATCH] Store eligibility and metadata caches in rocksdb Also renames the checksum cache to eligibility cache since it also now supports yanking. There will be a follow-up change to add a `bust-cache` SSH command to replace the delayed caching that was previously in place. --- CHANGELOG.md | 4 +++- Cargo.lock | 266 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 6 ++++++ src/cache.rs | 311 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/config.rs | 11 +++++++++++ src/main.rs | 111 ++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------- src/metadata.rs | 59 +++++++++++++++++++++++++++++++++++++++++++---------------- src/providers/gitlab.rs | 172 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------ src/providers/mod.rs | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++------ src/providers/gitlab/checksums.rs | 40 ---------------------------------------- 10 files changed, 828 insertions(+), 208 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ba672bd..0e43ad8 100644 --- a/CHANGELOG.md +++ a/CHANGELOG.md @@ -6,7 +6,9 @@ - Add info logs for release & metadata fetch latency. - When fetching all releases handle 429 by backing off. - Improve fetch error logging. -- Add file checksum fetch caching controlled by `cache-releases-older-than` config. +- Added crate eligibility cache. +- Introduce configurable cache backend with a RocksDB implementation (set `cache.type = "rocksdb"` and `cache.path = "cache"` to use it), defaults to `cache.type = "in-memory"`. +- Support crate yanking by creating a `yanked` file on the release. # v0.1.4 diff --git a/Cargo.lock b/Cargo.lock index fe5259a..47f5492 100644 --- a/Cargo.lock +++ a/Cargo.lock @@ -179,6 +179,45 @@ ] [[package]] +name = "bincode" +version = "2.0.0-rc.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f11ea1a0346b94ef188834a65c068a03aec181c94896d481d7a0a40d85b0ce95" +dependencies = [ + "bincode_derive", + "serde", +] + +[[package]] +name = "bincode_derive" +version = "2.0.0-rc.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e30759b3b99a1b802a7a3aa21c85c3ded5c28e1c83170d82d70f08bbf7f3e4c" +dependencies = [ + "virtue", +] + +[[package]] +name = "bindgen" +version = "0.69.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a00dc851838a2120612785d195287475a3ac45514741da670b735818822129a0" +dependencies = [ + "bitflags 2.4.2", + "cexpr", + "clang-sys", + "itertools", + "lazy_static", + "lazycell", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn", +] + +[[package]] name = "bit-vec" version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -257,6 +296,17 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" + +[[package]] +name = "bzip2-sys" +version = "0.1.11+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" +dependencies = [ + "cc", + "libc", + "pkg-config", +] [[package]] name = "camino" @@ -297,6 +347,15 @@ checksum = "7f9fa1897e4325be0d68d48df6aa1a71ac2ed4d27723887e7754192705350730" dependencies = [ "libc", +] + +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", ] [[package]] @@ -322,6 +381,17 @@ dependencies = [ "crypto-common", "inout", +] + +[[package]] +name = "clang-sys" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67523a3b4be3ce1989d607a828d036249522dd9c1c8de7f4dd2dae43a37369d1" +dependencies = [ + "glob", + "libc", + "libloading", ] [[package]] @@ -499,6 +569,12 @@ "redox_users", "winapi", ] + +[[package]] +name = "either" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" [[package]] name = "encoding_rs" @@ -526,6 +602,12 @@ ] [[package]] +name = "fastrand" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" + +[[package]] name = "flate2" version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -674,6 +756,7 @@ "arrayvec", "async-trait", "backoff", + "bincode", "bytes", "cargo-platform", "cargo_metadata", @@ -689,11 +772,13 @@ "parse_link_header", "percent-encoding", "reqwest", + "rocksdb", "semver", "serde", "serde_json", "shlex", "smol_str", + "tempfile", "thrussh", "thrussh-keys", "thrussh-libsodium", @@ -707,8 +792,15 @@ "urlencoding", "ustr", "uuid", + "yoke", "zstd", ] + +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "h2" @@ -900,6 +992,15 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" + +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] [[package]] name = "itoa" @@ -921,12 +1022,28 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" + +[[package]] +name = "libloading" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c571b676ddfc9a8c12f1f3d3085a7b163966a8fd8098a90640953ce5f6170161" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] [[package]] name = "libredox" @@ -940,6 +1057,22 @@ ] [[package]] +name = "librocksdb-sys" +version = "0.16.0+8.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce3d60bc059831dc1c83903fb45c103f75db65c5a7bf22272764d9cc683e348c" +dependencies = [ + "bindgen", + "bzip2-sys", + "cc", + "glob", + "libc", + "libz-sys", + "lz4-sys", + "zstd-sys", +] + +[[package]] name = "libsodium-sys" version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -949,6 +1082,17 @@ "libc", "pkg-config", "walkdir", +] + +[[package]] +name = "libz-sys" +version = "1.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "037731f5d3aaa87a5675e895b63ddff1a87624bc29f77004ea829809654e48f6" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", ] [[package]] @@ -972,6 +1116,16 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" + +[[package]] +name = "lz4-sys" +version = "1.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900" +dependencies = [ + "cc", + "libc", +] [[package]] name = "matchers" @@ -999,6 +1153,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] name = "miniz_oxide" @@ -1018,6 +1178,16 @@ "libc", "wasi", "windows-sys 0.48.0", +] + +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", ] [[package]] @@ -1389,6 +1559,16 @@ "spin", "untrusted", "windows-sys 0.52.0", +] + +[[package]] +name = "rocksdb" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd13e55d6d7b8cd0ea569161127567cd587676c99f4472f779a0279aa60a7a7" +dependencies = [ + "libc", + "librocksdb-sys", ] [[package]] @@ -1396,6 +1576,12 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" [[package]] name = "rustix" @@ -1624,6 +1810,12 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" [[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + +[[package]] name = "strsim" version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1651,6 +1843,17 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + +[[package]] +name = "synstructure" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "system-configuration" @@ -1671,6 +1874,18 @@ dependencies = [ "core-foundation-sys", "libc", +] + +[[package]] +name = "tempfile" +version = "3.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a365e8cd18e44762ef95d87f284f4b5cd04107fec2ff3052bd6a3e6069669e67" +dependencies = [ + "cfg-if", + "fastrand", + "rustix", + "windows-sys 0.52.0", ] [[package]] @@ -2067,6 +2282,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" + +[[package]] +name = "virtue" +version = "0.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dcc60c0624df774c82a0ef104151231d37da4962957d691c011c852b2473314" [[package]] name = "walkdir" @@ -2356,6 +2577,30 @@ dependencies = [ "bit-vec", "num-bigint", +] + +[[package]] +name = "yoke" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65e71b2e4f287f467794c671e2b8f8a5f3716b3c829079a1c44740148eff07e4" +dependencies = [ + "serde", + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e6936f0cce458098a201c245a11bef556c6a0181129c7034d10d76d1ec3a2b8" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", ] [[package]] @@ -2372,10 +2617,31 @@ version = "0.7.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "zerofrom" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "655b0814c5c0b19ade497851070c640773304939a6c0fd5f5fb43da0696d05b7" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6a647510471d372f2e6c2e6b7219e44d8c574d24fdc11c610a61455782f18c3" dependencies = [ "proc-macro2", "quote", "syn", + "synstructure", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 7f411ce..09ca10f 100644 --- a/Cargo.toml +++ a/Cargo.toml @@ -9,6 +9,7 @@ arrayvec = "0.7" async-trait = "0.1" backoff = "0.4" +bincode = { version = "2.0.0-rc.3", features = ["serde"] } bytes = "1.1" cargo_metadata = "0.15" cargo-platform = "0.1" @@ -24,6 +25,7 @@ parse_link_header = "0.3" percent-encoding = "2.3" reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } +rocksdb = "0.22" semver = "1.0" serde = { version = "1.0", features = ["derive", "rc"] } serde_json = "1" @@ -42,7 +44,11 @@ urlencoding = "2.1" ustr = "0.10" uuid = { version = "1.1", features = ["v4"] } +yoke = { version = "0.7", features = ["derive"] } zstd = "0.13" + +[dev-dependencies] +tempfile = "3.10" [profile.release] lto = "thin" diff --git a/src/cache.rs b/src/cache.rs new file mode 100644 index 0000000..5d22f2a 100644 --- /dev/null +++ a/src/cache.rs @@ -1,0 +1,311 @@ +use std::collections::HashMap; +use std::{ + io::{Error, ErrorKind}, + path::Path, + sync::Arc, +}; + +use async_trait::async_trait; +use parking_lot::RwLock; +use serde::{Deserialize, Serialize}; +use yoke::{Yoke, Yokeable}; + +use crate::config::{CacheStore, Config}; + +pub const BINCODE_CONFIG: bincode::config::Configuration = bincode::config::standard(); + +/// Marker trait for values that can be stored within the cache +pub trait Cacheable: Serialize + Send + for<'a> Yokeable<'a> { + /// The key used to uniquely identify a cache item. + type Key<'a>: Send + 'a; + /// A unique kind for the `Cacheable` used to prefix the `Key`. + const KIND: CacheKind; + + /// Builds the key to store in the cache by prefixing `Self::format_key` with `Self::KIND`. + fn build_key(k: Self::Key<'_>) -> Vec { + let mut key = Vec::new(); + key.push(Self::KIND as u8); + Self::format_key(&mut key, k); + key + } + + /// Serializes `k` to `out`. + fn format_key(out: &mut Vec, k: Self::Key<'_>); +} + +/// A unique prefix for each type stored within the cache to prevent conflicts. +#[repr(u8)] +pub enum CacheKind { + Eligibility = 1, + CrateMetadata = 2, +} + +/// A generic-erased `Cache`. +#[derive(Clone)] +pub enum ConcreteCache { + RocksDb(RocksDb), + InMemory(InMemory), +} + +impl ConcreteCache { + /// Instantiates a new `Cache`. + pub fn new(config: &Config) -> Result { + Ok(match &config.cache { + CacheStore::RocksDb { path } => Self::RocksDb(RocksDb::new(path)?), + CacheStore::InMemory => Self::InMemory(InMemory::default()), + }) + } +} + +#[async_trait] +impl Cache for ConcreteCache { + async fn put(&self, key: C::Key<'_>, value: &C) -> Result<(), Error> { + match self { + Self::RocksDb(r) => r.put(key, value).await, + Self::InMemory(i) => i.put(key, value).await, + } + } + + async fn get( + &self, + key: C::Key<'_>, + ) -> Result>>, Error> + where + for<'a> >::Output: Deserialize<'a>, + { + match self { + Self::RocksDb(r) => r.get(key).await, + Self::InMemory(i) => i.get(key).await, + } + } + + async fn remove(&self, key: C::Key<'_>) -> Result<(), Error> { + match self { + Self::RocksDb(r) => r.remove::(key).await, + Self::InMemory(i) => i.remove::(key).await, + } + } +} + +#[async_trait] +pub trait Cache { + /// Inserts a value into the cache. + async fn put(&self, key: C::Key<'_>, value: &C) -> Result<(), Error>; + + /// Retrieves a value from the cache. + async fn get( + &self, + key: C::Key<'_>, + ) -> Result>>, Error> + where + for<'a> >::Output: Deserialize<'a>; + + /// Removes a value from the cache. + async fn remove(&self, key: C::Key<'_>) -> Result<(), Error>; +} + +#[derive(Clone, Default)] +#[allow(clippy::type_complexity)] +pub struct InMemory { + db: Arc, Box<[u8]>>>>, +} + +#[async_trait] +impl Cache for InMemory { + async fn put(&self, key: C::Key<'_>, value: &C) -> Result<(), Error> { + let serialized = bincode::serde::encode_to_vec(value, BINCODE_CONFIG) + .map_err(|e| Error::new(ErrorKind::Other, e))?; + let key = C::build_key(key); + + self.db + .write() + .insert(key.into_boxed_slice(), serialized.into_boxed_slice()); + + Ok(()) + } + + async fn get( + &self, + key: C::Key<'_>, + ) -> Result>>, Error> + where + for<'a> >::Output: Deserialize<'a>, + { + let key = C::build_key(key); + let Some(value) = self.db.read().get(key.as_slice()).map(|v| v.to_vec()) else { + return Ok(None); + }; + + Yoke::try_attach_to_cart(value, |v| { + bincode::serde::decode_borrowed_from_slice(v, BINCODE_CONFIG) + }) + .map(Some) + .map_err(|e| Error::new(ErrorKind::Other, e)) + } + + async fn remove(&self, key: C::Key<'_>) -> Result<(), Error> { + self.db.write().remove(C::build_key(key).as_slice()); + Ok(()) + } +} + +#[derive(Clone)] +pub struct RocksDb { + rocks: Arc, +} + +impl RocksDb { + pub fn new(path: &Path) -> Result { + let rocks = rocksdb::DB::open_default(path).map_err(|e| Error::new(ErrorKind::Other, e))?; + Ok(Self { + rocks: Arc::new(rocks), + }) + } +} + +#[async_trait] +impl Cache for RocksDb { + async fn put(&self, key: C::Key<'_>, value: &C) -> Result<(), Error> { + let serialized = bincode::serde::encode_to_vec(value, BINCODE_CONFIG) + .map_err(|e| Error::new(ErrorKind::Other, e))?; + + let rocks = self.rocks.clone(); + let key = C::build_key(key); + + tokio::task::spawn_blocking(move || { + rocks + .put(key, serialized) + .map_err(|e| Error::new(ErrorKind::Other, e)) + }) + .await + .map_err(|e| Error::new(ErrorKind::Other, e))? + } + + async fn get( + &self, + key: C::Key<'_>, + ) -> Result>>, Error> + where + for<'a> >::Output: Deserialize<'a>, + { + let rocks = self.rocks.clone(); + let key = C::build_key(key); + + tokio::task::spawn_blocking(move || { + rocks + .get(key) + .map_err(|e| Error::new(ErrorKind::Other, e))? + .map(|v| { + Yoke::try_attach_to_cart(v, |v| { + bincode::serde::decode_borrowed_from_slice(v, BINCODE_CONFIG) + }) + }) + .transpose() + .map_err(|e| Error::new(ErrorKind::Other, e)) + }) + .await + .map_err(|e| Error::new(ErrorKind::Other, e))? + } + + async fn remove(&self, key: C::Key<'_>) -> Result<(), Error> { + let rocks = self.rocks.clone(); + let key = C::build_key(key); + + tokio::task::spawn_blocking(move || { + rocks + .delete(key) + .map_err(|e| Error::new(ErrorKind::Other, e)) + }) + .await + .map_err(|e| Error::new(ErrorKind::Other, e))? + } +} + +pub type Yoked = Yoke>; + +#[cfg(test)] +mod test { + use crate::cache::{Cache, InMemory, RocksDb}; + use crate::providers::{EligibilityCacheKey, Release}; + use std::borrow::Cow; + use tempfile::tempdir; + + async fn test_suite(cache: T) { + let out = cache + .get::>>(EligibilityCacheKey::new( + "my-project", + "my-crate", + "my-crate-version", + )) + .await + .unwrap(); + assert!(out.is_none()); + + cache + .put( + EligibilityCacheKey::new("my-project", "my-crate", "my-crate-version"), + &None, + ) + .await + .unwrap(); + let out = cache + .get::>>(EligibilityCacheKey::new( + "my-project", + "my-crate", + "my-crate-version", + )) + .await + .unwrap(); + assert!(out.unwrap().get().is_none()); + + cache + .put( + EligibilityCacheKey::new("my-project", "my-crate", "my-crate-version"), + &Some(Release { + name: Cow::Borrowed("helloworld"), + version: Cow::Borrowed("1.0.0"), + checksum: Cow::Borrowed("123456"), + project: Cow::Borrowed("test"), + yanked: false, + }), + ) + .await + .unwrap(); + let out = cache + .get::>>(EligibilityCacheKey::new( + "my-project", + "my-crate", + "my-crate-version", + )) + .await + .unwrap(); + assert_eq!( + out.unwrap().get().as_ref().unwrap().name.as_ref(), + "helloworld" + ); + + let out = cache + .get::>>(EligibilityCacheKey::new( + "my-project", + "my-crate", + "my-crate-version-2", + )) + .await + .unwrap(); + assert!(out.is_none()); + } + + #[tokio::test] + async fn rocksdb() { + let temp_dir = tempdir().unwrap(); + let cache = RocksDb::new(temp_dir.path()).unwrap(); + + test_suite(cache).await; + } + + #[tokio::test] + async fn in_memory() { + let cache = InMemory::default(); + test_suite(cache).await; + } +} diff --git a/src/config.rs b/src/config.rs index 301bff4..3a9db39 100644 --- a/src/config.rs +++ a/src/config.rs @@ -19,6 +19,17 @@ pub listen_address: SocketAddr, pub state_directory: PathBuf, pub gitlab: GitlabConfig, + #[serde(default)] + pub cache: CacheStore, +} + +#[derive(Deserialize, Clone, Default)] +#[serde(rename_all = "kebab-case", tag = "type")] +pub enum CacheStore { + #[serde(rename = "rocksdb")] + RocksDb { path: PathBuf }, + #[default] + InMemory, } impl FromStr for Config { diff --git a/src/main.rs b/src/main.rs index a364429..f010a4c 100644 --- a/src/main.rs +++ a/src/main.rs @@ -1,16 +1,22 @@ #![deny(clippy::pedantic)] -#![allow(clippy::missing_errors_doc, clippy::blocks_in_conditions)] - +#![allow( + clippy::missing_errors_doc, + clippy::blocks_in_conditions, + clippy::module_name_repetitions +)] + +pub mod cache; pub mod config; pub mod git_command_handlers; pub mod metadata; pub mod providers; pub mod util; +use crate::cache::{Cache, ConcreteCache, Yoked}; use crate::{ config::Args, metadata::{CargoConfig, CargoIndexCrateMetadata}, - providers::{gitlab::Gitlab, PackageProvider, Release, ReleaseName, User, UserProvider}, + providers::{gitlab::Gitlab, PackageProvider, Release, User, UserProvider}, util::get_crate_folder, }; use anyhow::anyhow; @@ -24,10 +30,8 @@ low_level::{HashOutput, PackFileEntry}, PktLine, }; -use parking_lot::RwLock; use std::{ borrow::Cow, - collections::HashMap, fmt::Write, net::{SocketAddr, SocketAddrV6}, pin::Pin, @@ -42,8 +46,9 @@ use thrussh_keys::key::PublicKey; use tokio::sync::Semaphore; use tokio_util::codec::{Decoder, Encoder as CodecEncoder}; -use tracing::{error, info, info_span, instrument, trace, Instrument, Span}; +use tracing::{debug, error, info, info_span, instrument, trace, Instrument, Span}; use uuid::Uuid; +use yoke::Yoke; const AGENT: &str = concat!( "agent=", @@ -107,28 +112,25 @@ keys: vec![key], ..thrussh::server::Config::default() }); + + let cache = ConcreteCache::new(&args.config)?; - let gitlab = Arc::new(Gitlab::new(&args.config.gitlab)?); + let gitlab = Arc::new(Gitlab::new(&args.config.gitlab, cache.clone())?); thrussh::server::run( thrussh_config, &args.config.listen_address.to_string(), - Server { - gitlab, - metadata_cache: MetadataCache::default(), - }, + Server { gitlab, cache }, ) .await?; Ok(()) } -type MetadataCache = Arc, Arc>>>; - struct Server { gitlab: Arc, // todo: we could make our commit hash stable by leaving an update time // in this cache and using that as our commit time - metadata_cache: MetadataCache, + cache: ConcreteCache, } impl thrussh::server::Server @@ -152,7 +154,7 @@ input_bytes: BytesMut::new(), output_bytes: BytesMut::new(), is_git_protocol_v2: false, - metadata_cache: Arc::clone(&self.metadata_cache), + cache: self.cache.clone(), span, packfile_cache: None, } @@ -168,7 +170,7 @@ input_bytes: BytesMut, output_bytes: BytesMut, is_git_protocol_v2: bool, - metadata_cache: MetadataCache, + cache: ConcreteCache, span: Span, // Cache of the packfile generated for this user in case it's requested // more than once @@ -208,19 +210,21 @@ #[allow(clippy::type_complexity)] async fn fetch_releases_by_crate( &self, - ) -> anyhow::Result>> { + ) -> anyhow::Result, Vec>>>> { let user = self.user()?; let project = self.project()?; - let mut res = IndexMap::new(); + let mut res = IndexMap::, Vec>>>::new(); - for (path, release) in Arc::clone(&self.gitlab) + for release in Arc::clone(&self.gitlab) .fetch_releases_for_project(project, user) .await? { - res.entry((path, Arc::clone(&release.name))) - .or_insert_with(Vec::new) - .push(release); + if let Some(releases) = res.get_mut(release.get().name.as_ref()) { + releases.push(release); + } else { + res.insert(Arc::from(release.get().name.to_string()), vec![release]); + } } Ok(res) @@ -232,48 +236,39 @@ /// and hence verified they have permission to read it. async fn fetch_metadata( gitlab: &U, - cache: &MetadataCache, - path: &U::CratePath, + cache: &ConcreteCache, + project: &str, checksum: &str, crate_name: &str, crate_version: &str, do_as: &Arc, - ) -> anyhow::Result> { - let key = MetadataCacheKey { - checksum: checksum.into(), - crate_name: crate_name.into(), - crate_version: crate_version.into(), - }; - - // check if the crate metadata already exists in our cache, if it does - // we'll just return that + ) -> anyhow::Result>> { + if let Some(cache) = cache + .get::>(checksum) + .await? { - let reader = cache.read(); - if let Some(cache) = reader.get(&key) { - return Ok(Arc::clone(cache)); - } + debug!("Using metadata from cache"); + return Ok(cache); } + + info!("Fetching metadata from GitLab"); // fetch metadata from the provider let metadata = gitlab - .fetch_metadata_for_release(path, crate_version, do_as) + .fetch_metadata_for_release(project, crate_name, crate_version, do_as) .await?; // transform the `cargo metadata` output to the cargo index // format let cksum = checksum.to_string(); let metadata = metadata::transform(metadata, crate_name, cksum) - .map(Arc::new) .ok_or_else(|| anyhow!("the supplied metadata.json did contain the released crate"))?; // cache the transformed value so the next user to pull it // doesn't have to wait for _yet another_ gitlab call - { - let mut writer = cache.write(); - writer.insert(key.into_owned(), Arc::clone(&metadata)); - } + cache.put(checksum, &metadata).await?; - Ok(metadata) + Ok(Yoke::attach_to_cart(Vec::new(), move |_| metadata)) } // Builds the packfile for the current connection, and caches it in case @@ -326,20 +321,22 @@ let fetch_concurrency = Semaphore::new(PARALLEL_METADATA_FETCHES); let mut metadata_fetches = FuturesOrdered::new(); let a = Instant::now(); - for ((crate_path, crate_name), releases) in &releases_by_crate { + for (crate_name, releases) in &releases_by_crate { for release in releases { metadata_fetches.push_back({ let user = Arc::clone(user); let gitlab = &self.gitlab; - let cache = &self.metadata_cache; + let cache = &self.cache; let fetch_concurrency = &fetch_concurrency; - let checksum = &release.checksum; - let version = &release.version; + let crate_name = crate_name.as_ref(); + let checksum = release.get().checksum.as_ref(); + let version = release.get().version.as_ref(); + let project = release.get().project.as_ref(); async move { let _guard = fetch_concurrency.acquire().await?; trace!("Fetching metadata for {crate_name}-{version}"); Self::fetch_metadata( - gitlab, cache, crate_path, checksum, crate_name, version, &user, + gitlab, cache, project, checksum, crate_name, version, &user, ) .await } @@ -350,7 +347,7 @@ // a reusable buffer for writing the metadata json blobs out to // for each package let mut buffer = BytesMut::new().writer(); - for ((_, crate_name), releases) in &releases_by_crate { + for (crate_name, releases) in &releases_by_crate { for release in releases { match metadata_fetches .next() @@ -360,7 +357,7 @@ Ok(meta) => { // each crates file in the index is a metadata blob for // each version separated by a newline - serde_json::to_writer(&mut buffer, &*meta)?; + serde_json::to_writer(&mut buffer, meta.get())?; buffer.get_mut().put_u8(b'\n'); } // continue after errors, metadata files may be missing etc @@ -371,7 +368,7 @@ // insert the crate version metadata into the packfile packfile.insert( &get_crate_folder(crate_name), - Arc::clone(crate_name), + crate_name.to_string(), buffer.get_mut().split().freeze(), )?; } @@ -644,14 +641,4 @@ checksum: Cow<'a, str>, crate_name: Cow<'a, str>, crate_version: Cow<'a, str>, -} - -impl MetadataCacheKey<'_> { - pub fn into_owned(self) -> MetadataCacheKey<'static> { - MetadataCacheKey { - checksum: self.checksum.into_owned().into(), - crate_name: self.crate_name.into_owned().into(), - crate_version: self.crate_version.into_owned().into(), - } - } } diff --git a/src/metadata.rs b/src/metadata.rs index aed0ce9..4d4d140 100644 --- a/src/metadata.rs +++ a/src/metadata.rs @@ -1,10 +1,12 @@ #![allow(clippy::module_name_repetitions)] +use crate::cache::{CacheKind, Cacheable}; use cargo_metadata::{DependencyKind, Package}; use cargo_platform::Platform; use semver::{Version, VersionReq}; use serde::{Deserialize, Serialize}; use std::{borrow::Cow, collections::HashMap}; +use yoke::Yokeable; /// Transforms metadata from `cargo metadata` to the standard one-line JSON used in cargo registries. /// @@ -14,14 +16,14 @@ metadata: cargo_metadata::Metadata, crate_name: &str, cksum: String, -) -> Option { +) -> Option> { let package: Package = metadata .packages .into_iter() .find(|v| v.name == crate_name)?; Some(CargoIndexCrateMetadata { - name: package.name, + name: Cow::Owned(package.name), vers: package.version, deps: package .dependencies @@ -34,9 +36,9 @@ }; CargoIndexCrateMetadataDependency { - name, + name: Cow::Owned(name), req: v.req, - features: v.features, + features: v.features.into_iter().map(Cow::Owned).collect(), optional: v.optional, default_features: v.uses_default_features, target: v.target, @@ -45,14 +47,18 @@ Cow::Borrowed("https://github.com/rust-lang/crates.io-index.git"), Cow::Owned, )), - package, + package: package.map(Cow::Owned), } }) .collect(), cksum, - features: package.features, + features: package + .features + .into_iter() + .map(|(k, v)| (Cow::Owned(k), v.into_iter().map(Cow::Owned).collect())) + .collect(), yanked: false, - links: package.links, + links: package.links.map(Cow::Owned), }) } @@ -61,26 +67,43 @@ pub dl: String, } -#[derive(Serialize, Deserialize, Debug)] -pub struct CargoIndexCrateMetadata { - name: String, +#[derive(Serialize, Deserialize, Debug, Yokeable)] +pub struct CargoIndexCrateMetadata<'a> { + #[serde(borrow)] + name: Cow<'a, str>, vers: Version, - deps: Vec, + #[serde(borrow)] + deps: Vec>, cksum: String, - features: HashMap>, + #[serde(borrow)] + features: HashMap, Vec>>, yanked: bool, - links: Option, + #[serde(borrow)] + links: Option>, +} + +impl Cacheable for CargoIndexCrateMetadata<'static> { + type Key<'b> = &'b str; + const KIND: CacheKind = CacheKind::CrateMetadata; + + fn format_key(out: &mut Vec, k: Self::Key<'_>) { + out.extend_from_slice(k.as_bytes()); + } } #[derive(Serialize, Deserialize, Debug)] -pub struct CargoIndexCrateMetadataDependency { - name: String, +pub struct CargoIndexCrateMetadataDependency<'a> { + #[serde(borrow)] + name: Cow<'a, str>, req: VersionReq, - features: Vec, + #[serde(borrow)] + features: Vec>, optional: bool, default_features: bool, target: Option, kind: DependencyKind, - registry: Option>, - package: Option, + #[serde(borrow)] + registry: Option>, + #[serde(borrow)] + package: Option>, } diff --git a/src/providers/gitlab.rs b/src/providers/gitlab.rs index 2c9e863..897b80e 100644 --- a/src/providers/gitlab.rs +++ a/src/providers/gitlab.rs @@ -1,7 +1,7 @@ // blocks_in_conditions: didn't work with `#[instrument...`` usage #![allow(clippy::module_name_repetitions, clippy::blocks_in_conditions)] -mod checksums; - +use crate::cache::{Cache, ConcreteCache, Yoked}; +use crate::providers::EligibilityCacheKey; use crate::{ config::{GitlabConfig, MetadataFormat}, providers::{Release, User}, @@ -9,17 +9,18 @@ use anyhow::Context; use async_trait::async_trait; use backoff::backoff::Backoff; -use checksums::ChecksumCache; use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; use reqwest::{header, Certificate}; use serde::{Deserialize, Serialize}; -use smol_str::{format_smolstr, SmolStr}; -use std::{sync::Arc, time::Duration}; +use smol_str::SmolStr; +use std::borrow::Cow; +use std::sync::Arc; use time::OffsetDateTime; use tokio::sync::Semaphore; -use tracing::{debug, info_span, instrument, Instrument}; +use tracing::{debug, info, info_span, instrument, Instrument}; use url::Url; +use yoke::Yoke; /// Number of `package_files` GETs to do in parallel. const PARALLEL_PACKAGE_FILES_GETS: usize = 32; @@ -30,12 +31,11 @@ token_expiry: time::Duration, metadata_format: MetadataFormat, admin_token: Option, - checksums: ChecksumCache, - cache_checksums_older_than: Option, + cache: ConcreteCache, } impl Gitlab { - pub fn new(config: &GitlabConfig) -> anyhow::Result { + pub fn new(config: &GitlabConfig, cache: ConcreteCache) -> anyhow::Result { let mut client_builder = reqwest::ClientBuilder::new(); if let Some(cert_path) = &config.ssl_cert { @@ -50,23 +50,52 @@ token_expiry: config.token_expiry, metadata_format: config.metadata_format, admin_token: config.admin_token.clone(), - checksums: <_>::default(), - cache_checksums_older_than: config.cache_releases_older_than, + cache, }) } - async fn fetch_checksum( - &self, - key: checksums::Key, + /// Checks if the given release has a `metadata.json` and matching `{release-name}-{release-version}.crate` + /// file, if it does then returns an `Ok(Some(Release))` result containing metadata about the + /// release, otherwise `Ok(None)` will be returned meaning the release isn't eligible. + #[instrument(skip_all, err)] + async fn check_release_is_eligible( + self: Arc, + release: GitlabPackageResponse, do_as: &User, - ) -> anyhow::Result>> { - if let Some(chksum) = self.checksums.get(&key) { - return Ok(Some(chksum)); + ) -> anyhow::Result>>> { + let (raw_project, package_id) = { + let mut splitter = release.links.web_path.splitn(2, "/-/packages/"); + match (splitter.next(), splitter.next()) { + (Some(project), Some(package)) => (&project[1..], package), + _ => return Ok(Yoke::attach_to_cart(Vec::new(), |_| None)), + } + }; + + // we've already verified that the user has access to this package as this function is + // only ever called after its been seen from the API in `get_releases` + let cache_key = EligibilityCacheKey::new(raw_project, &release.name, &release.version); + if let Some(cached) = self + .cache + .get::>>(cache_key) + .await + .context("failed to lookup release cache")? + { + debug!("Returning cached eligibility for release"); + return Ok(cached); } + + info!("Fetching eligibility for release"); + let project = utf8_percent_encode(raw_project, NON_ALPHANUMERIC); + let package_id = utf8_percent_encode(package_id, NON_ALPHANUMERIC); + + let uri = self.base_url.join(&format!( + "projects/{project}/packages/{package_id}/package_files", + ))?; + let package_files: Vec = handle_error( self.client - .get(key.fetch_url()) + .get(uri) .user_or_admin_token(do_as, &self.admin_token) .send_retry_429() .await?, @@ -74,24 +103,39 @@ .await? .json() .await?; - - let Some(file) = package_files - .into_iter() - .find(|package_file| package_file.file_name == key.file_name) - else { - return Ok(None); - }; - // if `cache_checksums_older_than` is configured and this file is old enough - // cache the checksum to avoid having to fetch again - if let Some(cache_older_than) = self.cache_checksums_older_than { - let cache_max_created = OffsetDateTime::now_utc() - cache_older_than; - if file.created_at < cache_max_created { - self.checksums.set(key, Arc::clone(&file.file_sha256)); - } + // any crate releases must contain a metadata.json + if !package_files + .iter() + .any(|package_file| package_file.file_name == "metadata.json") + { + return Ok(Yoke::attach_to_cart(Vec::new(), |_| None)); } + + let yanked = package_files + .iter() + .any(|package_file| package_file.file_name == "yanked"); + + let expected_file_name = format!("{}-{}.crate", release.name, release.version); - Ok(Some(file.file_sha256)) + // grab the sha256 checksum of the .crate file itself + let release = package_files + .into_iter() + .find(|package_file| package_file.file_name == expected_file_name) + .map(|package_file| Release { + name: Cow::Owned(release.name.to_string()), + version: Cow::Owned(release.version.clone()), + checksum: Cow::Owned(package_file.file_sha256), + project: Cow::Owned(raw_project.to_string()), + yanked, + }); + + self.cache + .put(cache_key, &release) + .await + .context("failed to write to cache")?; + + Ok(Yoke::attach_to_cart(Vec::new(), |_| release)) } } @@ -206,7 +250,7 @@ self: Arc, project: &str, do_as: &Arc, - ) -> anyhow::Result> { + ) -> anyhow::Result>>> { let mut next_uri = Some({ let mut uri = self.base_url.join(&format!( "projects/{}/packages", @@ -260,44 +304,9 @@ futures.push( async move { let _guard = fetch_concurrency.acquire().await?; - - let (project, package) = { - let mut splitter = release.links.web_path.splitn(2, "/-/packages/"); - match (splitter.next(), splitter.next()) { - (Some(project), Some(package)) => (&project[1..], package), - _ => return anyhow::Ok(None), - } - }; - - let package_path = Arc::new(GitlabCratePath { - project: utf8_percent_encode(project, NON_ALPHANUMERIC).to_string(), - package_name: utf8_percent_encode(&release.name, NON_ALPHANUMERIC) - .to_string(), - }); - - let key = checksums::Key { - base_url: this.base_url.as_str().into(), - project: project.into(), - package: package.into(), - file_name: format_smolstr!( - "{}-{}.crate", - release.name, - release.version - ), - }; - - let checksum = this.fetch_checksum(key, &do_as).await?; - - Ok(checksum.map(|checksum| { - ( - Arc::clone(&package_path), - Release { - name: Arc::from(release.name), - version: release.version, - checksum, - }, - ) - })) + this.clone() + .check_release_is_eligible(release, &do_as) + .await } .instrument(info_span!("fetch_package_files")), ); @@ -305,8 +314,8 @@ } futures - .err_into() - .filter_map(|v| async { v.transpose() }) + .map_ok(|v| v.try_map_project(|res, _| res.ok_or(()))) + .filter_map(|v| async move { v.map(Result::ok).transpose() }) .try_collect() .await } @@ -314,14 +323,19 @@ #[instrument(skip(self), err)] async fn fetch_metadata_for_release( &self, - path: &Self::CratePath, + project: &str, + crate_name: &str, version: &str, do_as: &Arc, ) -> anyhow::Result { let fmt = self.metadata_format; - let url = self - .base_url - .join(&path.file_uri(fmt.filename(), version))?; + let url = self.base_url.join(&format!( + "projects/{}/packages/generic/{}/{}/{}", + utf8_percent_encode(project, NON_ALPHANUMERIC), + utf8_percent_encode(crate_name, NON_ALPHANUMERIC), + utf8_percent_encode(version, NON_ALPHANUMERIC), + fmt.filename(), + ))?; fmt.decode( self.client @@ -395,7 +409,7 @@ pub file_name: SmolStr, #[serde(with = "time::serde::rfc3339")] pub created_at: time::OffsetDateTime, - pub file_sha256: Arc, + pub file_sha256: String, } #[derive(Deserialize)] diff --git a/src/providers/mod.rs b/src/providers/mod.rs index 0b22d99..f110d11 100644 --- a/src/providers/mod.rs +++ a/src/providers/mod.rs @@ -1,7 +1,12 @@ pub mod gitlab; +use crate::cache::{CacheKind, Cacheable, Yoked}; use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::borrow::Cow; +use std::io::Write; use std::sync::Arc; +use yoke::Yokeable; #[async_trait] pub trait UserProvider { @@ -25,11 +30,12 @@ self: Arc, project: &str, do_as: &Arc, - ) -> anyhow::Result>; + ) -> anyhow::Result>>>; async fn fetch_metadata_for_release( &self, - path: &Self::CratePath, + project: &str, + crate_name: &str, version: &str, do_as: &Arc, ) -> anyhow::Result; @@ -42,13 +48,47 @@ pub id: u64, pub username: String, pub token: Option, +} + +pub type ReleaseName<'a> = Cow<'a, str>; + +#[derive(Debug, Hash, Clone, Copy, PartialEq, Eq)] +pub struct EligibilityCacheKey<'a> { + project: &'a str, + crate_name: &'a str, + crate_version: &'a str, +} + +impl<'a> EligibilityCacheKey<'a> { + #[must_use] + pub fn new(project: &'a str, crate_name: &'a str, crate_version: &'a str) -> Self { + Self { + project, + crate_name, + crate_version, + } + } } -pub type ReleaseName = Arc; +#[derive(Debug, Yokeable, Deserialize, Serialize)] +pub struct Release<'a> { + #[serde(borrow)] + pub name: ReleaseName<'a>, + #[serde(borrow)] + pub version: Cow<'a, str>, + #[serde(borrow)] + pub checksum: Cow<'a, str>, + #[serde(borrow)] + pub project: Cow<'a, str>, + pub yanked: bool, +} -#[derive(Debug)] -pub struct Release { - pub name: ReleaseName, - pub version: String, - pub checksum: Arc, +impl Cacheable for Option> { + type Key<'b> = EligibilityCacheKey<'b>; + const KIND: CacheKind = CacheKind::Eligibility; + + fn format_key(out: &mut Vec, k: Self::Key<'_>) { + out.reserve(k.project.len() + k.crate_name.len() + k.crate_version.len() + 2); + write!(out, "{}\0{}\0{}", k.project, k.crate_name, k.crate_version).unwrap(); + } } diff --git a/src/providers/gitlab/checksums.rs b/src/providers/gitlab/checksums.rs deleted file mode 100644 index 06f2ef7..0000000 100644 --- a/src/providers/gitlab/checksums.rs +++ /dev/null @@ -1,40 +1,0 @@ -use parking_lot::RwLock; -use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; -use smol_str::SmolStr; -use std::{collections::HashMap, sync::Arc}; - -/// Cache of fetched `/package_files` checksums fetched from -/// -#[derive(Debug, Default)] -pub struct ChecksumCache { - checksums: RwLock>>, -} - -impl ChecksumCache { - pub fn get(&self, key: &Key) -> Option> { - self.checksums.read().get(key).cloned() - } - - pub fn set(&self, key: Key, checksum: Arc) { - self.checksums.write().insert(key, checksum); - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct Key { - pub base_url: SmolStr, - pub project: SmolStr, - pub package: SmolStr, - pub file_name: SmolStr, -} - -impl Key { - pub fn fetch_url(&self) -> String { - format!( - "{}/projects/{}/packages/{}/package_files", - self.base_url, - utf8_percent_encode(self.project.as_str(), NON_ALPHANUMERIC), - utf8_percent_encode(self.package.as_str(), NON_ALPHANUMERIC), - ) - } -} -- rgit 0.1.3