#![deny(clippy::pedantic)]
#![allow(
clippy::missing_errors_doc,
clippy::blocks_in_conditions,
clippy::module_name_repetitions
)]
pub mod cache;
pub mod command;
pub mod config;
pub mod metadata;
pub mod providers;
pub mod util;
use crate::{
cache::{Cache, ConcreteCache, Yoked},
config::Args,
metadata::{CargoConfig, CargoIndexCrateMetadata},
providers::{gitlab::Gitlab, PackageProvider, Release, User, UserProvider},
util::get_crate_folder,
};
use anyhow::{anyhow, bail};
use bytes::{BufMut, Bytes, BytesMut};
use clap::Parser;
use futures::{stream::FuturesOrdered, Future, StreamExt};
use indexmap::IndexMap;
use packfile::{
codec::{Encoder, GitCodec},
high_level::GitRepository,
low_level::{HashOutput, PackFileEntry},
PktLine,
};
use semver::Version;
use std::{
fmt::Write,
net::{SocketAddr, SocketAddrV6},
pin::Pin,
str::FromStr,
sync::Arc,
time::Instant,
};
use thrussh::{
server::{Auth, Session},
ChannelId, CryptoVec,
};
use thrussh_keys::key::PublicKey;
use tokio::sync::Semaphore;
use tokio_util::codec::{Decoder, Encoder as CodecEncoder};
use tracing::{debug, error, info, info_span, instrument, trace, Instrument, Span};
use uuid::Uuid;
use yoke::Yoke;
const AGENT: &str = concat!(
"agent=",
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION"),
"\n"
);
const PARALLEL_METADATA_FETCHES: usize = 6;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let subscriber = tracing_subscriber::fmt().with_env_filter(
tracing_subscriber::EnvFilter::builder()
.with_default_directive(tracing::level_filters::LevelFilter::INFO.into())
.from_env_lossy(),
);
#[cfg(debug_assertions)]
let subscriber = subscriber.pretty();
subscriber.init();
let args: Args = Args::parse();
if !args.config.state_directory.exists() {
std::fs::create_dir_all(&args.config.state_directory)?;
}
let server_private_key = args.config.state_directory.join("ssh-private-key.pem");
let key = if server_private_key.exists() {
let key_bytes = std::fs::read(&server_private_key)?;
if key_bytes.len() != 64 {
anyhow::bail!(
"invalid private key. length = {}, expected = 64",
key_bytes.len()
);
}
let mut key = [0_u8; 64];
key.copy_from_slice(&key_bytes);
thrussh_keys::key::KeyPair::Ed25519(thrussh_keys::key::ed25519::SecretKey { key })
} else {
info!(
"Generating new server private key to {}",
server_private_key.display()
);
let key = thrussh_keys::key::KeyPair::generate_ed25519()
.ok_or_else(|| anyhow!("failed to generate server private key"))?;
let thrussh_keys::key::KeyPair::Ed25519(key) = key;
std::fs::write(server_private_key, key.key)?;
thrussh_keys::key::KeyPair::Ed25519(key)
};
let thrussh_config = Arc::new(thrussh::server::Config {
methods: thrussh::MethodSet::PUBLICKEY,
keys: vec![key],
..thrussh::server::Config::default()
});
let cache = ConcreteCache::new(&args.config)?;
let gitlab = Arc::new(Gitlab::new(&args.config.gitlab, cache.clone())?);
thrussh::server::run(
thrussh_config,
&args.config.listen_address.to_string(),
Server { gitlab, cache },
)
.await?;
Ok(())
}
struct Server<U: UserProvider + PackageProvider + Send + Sync + 'static> {
gitlab: Arc<U>,
cache: ConcreteCache,
}
impl<U: UserProvider + PackageProvider + Send + Sync + 'static> thrussh::server::Server
for Server<U>
{
type Handler = Handler<U>;
fn new(&mut self, peer_addr: Option<SocketAddr>) -> Self::Handler {
let connection_id = Uuid::new_v4();
let peer_addr =
peer_addr.unwrap_or_else(|| SocketAddrV6::from_str("[::]:0").unwrap().into());
let span = info_span!("ssh", ?peer_addr, ?connection_id);
info!(parent: &span, "Incoming connection");
Handler {
codec: GitCodec::default(),
gitlab: Arc::clone(&self.gitlab),
user: None,
project: None,
input_bytes: BytesMut::new(),
output_bytes: BytesMut::new(),
is_git_protocol_v2: false,
cache: self.cache.clone(),
span,
packfile_cache: None,
}
}
}
pub struct Handler<U: UserProvider + PackageProvider + Send + Sync + 'static> {
codec: GitCodec,
gitlab: Arc<U>,
user: Option<Arc<User>>,
project: Option<Arc<str>>,
input_bytes: BytesMut,
output_bytes: BytesMut,
is_git_protocol_v2: bool,
cache: ConcreteCache,
span: Span,
packfile_cache: Option<Arc<(HashOutput, Vec<PackFileEntry>)>>,
}
impl<U: UserProvider + PackageProvider + Send + Sync + 'static> Handler<U> {
fn user(&self) -> anyhow::Result<&Arc<User>> {
self.user
.as_ref()
.ok_or_else(|| anyhow::anyhow!("no user set"))
}
fn project(&self) -> anyhow::Result<&str> {
self.project
.as_deref()
.ok_or_else(|| anyhow::anyhow!("no project set"))
}
fn write(&mut self, packet: PktLine<'_>) -> Result<(), anyhow::Error> {
Ok(Encoder.encode(packet, &mut self.output_bytes)?)
}
fn flush(&mut self, session: &mut Session, channel: ChannelId) {
session.data(
channel,
CryptoVec::from_slice(self.output_bytes.split().as_ref()),
);
}
#[instrument(skip(self), err)]
#[allow(clippy::type_complexity)]
async fn fetch_releases_by_crate(
&self,
) -> anyhow::Result<IndexMap<Arc<str>, Vec<Yoked<Release<'static>>>>> {
let user = self.user()?;
let project = self.project()?;
let mut res = IndexMap::<Arc<str>, Vec<Yoked<Release<'static>>>>::new();
for release in Arc::clone(&self.gitlab)
.fetch_releases_for_project(project, user)
.await?
{
if let Some(releases) = res.get_mut(release.get().name.as_ref()) {
releases.push(release);
} else {
res.insert(Arc::from(release.get().name.to_string()), vec![release]);
}
}
Ok(res)
}
async fn fetch_metadata(
gitlab: &U,
cache: &ConcreteCache,
project: &str,
checksum: &str,
crate_name: &str,
crate_version: &str,
do_as: &Arc<User>,
) -> anyhow::Result<Yoked<CargoIndexCrateMetadata<'static>>> {
let crate_v: Option<Version> = crate_version.parse().ok();
let crate_v = crate_v.as_ref();
if let Some(cache) = cache
.get::<CargoIndexCrateMetadata<'static>>(checksum)
.await?
{
if crate_v.is_none() || crate_v == Some(&cache.get().vers) {
debug!(crate_name, crate_version, "Using metadata from cache");
return Ok(cache);
}
}
debug!(crate_name, crate_version, "Fetching metadata from GitLab");
let metadata = gitlab
.fetch_metadata_for_release(project, crate_name, crate_version, do_as)
.await?;
let cksum = checksum.to_string();
let metadata = metadata::transform(metadata, crate_name, crate_v, cksum)
.ok_or_else(|| anyhow!("the supplied metadata.json did contain the released crate"))?;
cache.put(checksum, &metadata).await?;
Ok(Yoke::attach_to_cart(Vec::new(), move |_| metadata))
}
#[instrument(skip(self), err)]
async fn build_packfile(&mut self) -> anyhow::Result<Arc<(HashOutput, Vec<PackFileEntry>)>> {
if let Some(packfile_cache) = &self.packfile_cache {
return Ok(Arc::clone(packfile_cache));
}
let mut packfile = GitRepository::default();
let project = self.project()?;
let user = self.user()?;
let token = match &user.token {
None => self.gitlab.fetch_token_for_user(user).await?,
Some(token) => token.clone(),
};
let config_json = Bytes::from(serde_json::to_vec(&CargoConfig {
dl: self.gitlab.cargo_dl_uri(project, &token)?,
})?);
packfile.insert(&[], "config.json", config_json)?;
let a = Instant::now();
let releases_by_crate = self.fetch_releases_by_crate().await?;
info!("Fetched crate releases in {:.1?}", a.elapsed());
let fetch_concurrency = Semaphore::new(PARALLEL_METADATA_FETCHES);
let mut metadata_fetches = FuturesOrdered::new();
let a = Instant::now();
for (crate_name, releases) in &releases_by_crate {
for release in releases {
metadata_fetches.push_back({
let user = Arc::clone(user);
let gitlab = &self.gitlab;
let cache = &self.cache;
let fetch_concurrency = &fetch_concurrency;
let crate_name = crate_name.as_ref();
let checksum = release.get().checksum.as_ref();
let version = release.get().version.as_ref();
let project = release.get().project.as_ref();
async move {
let _guard = fetch_concurrency.acquire().await?;
trace!("Fetching metadata for {crate_name}-{version}");
Self::fetch_metadata(
gitlab, cache, project, checksum, crate_name, version, &user,
)
.await
}
});
}
}
let mut buffer = BytesMut::new().writer();
for (crate_name, releases) in &releases_by_crate {
for release in releases {
match metadata_fetches
.next()
.await
.expect("invalid metadata_fetches")
{
Ok(meta) => {
serde_json::to_writer(&mut buffer, meta.get())?;
buffer.get_mut().put_u8(b'\n');
}
Err(error) => error!(%error, ?release, "fetch_metadata failed"),
}
}
packfile.insert(
&get_crate_folder(crate_name),
crate_name.to_string(),
buffer.get_mut().split().freeze(),
)?;
}
info!("Fetched crate metadata in {:.1?}", a.elapsed());
let packfile = Arc::new(packfile.commit(
env!("CARGO_PKG_NAME"),
"noreply@chart.rs",
"Latest crates from GitLab",
)?);
self.packfile_cache = Some(Arc::clone(&packfile));
Ok(packfile)
}
}
type AsyncHandlerFut<T, U> =
dyn Future<Output = Result<T, <Handler<U> as thrussh::server::Handler>::Error>> + Send;
#[allow(clippy::type_complexity)]
impl<U: UserProvider + PackageProvider + Send + Sync + 'static> thrussh::server::Handler
for Handler<U>
{
type Error = anyhow::Error;
type FutureAuth = Pin<Box<AsyncHandlerFut<(Handler<U>, Auth), U>>>;
type FutureUnit = Pin<Box<AsyncHandlerFut<(Handler<U>, Session), U>>>;
type FutureBool = futures::future::Ready<anyhow::Result<(Self, Session, bool)>>;
fn finished_auth(self, auth: Auth) -> Self::FutureAuth {
Box::pin(futures::future::ready(Ok((self, auth))))
}
fn finished_bool(self, b: bool, session: Session) -> Self::FutureBool {
futures::future::ready(Ok((self, session, b)))
}
fn finished(self, session: Session) -> Self::FutureUnit {
Box::pin(futures::future::ready(Ok((self, session))))
}
fn auth_publickey(mut self, user: &str, public_key: &PublicKey) -> Self::FutureAuth {
let fingerprint = public_key.fingerprint();
let user = user.to_string();
let span = info_span!(parent: &self.span, "auth_publickey", ?fingerprint);
Box::pin(
capture_errors(async move {
let mut by_ssh_key = false;
let mut user = self
.gitlab
.find_user_by_username_password_combo(&user)
.await?;
if user.is_none() {
by_ssh_key = true;
user = self
.gitlab
.find_user_by_ssh_key(&util::format_fingerprint(&fingerprint))
.await?;
}
if let Some(user) = user {
info!(
"Successfully authenticated for GitLab user `{}` by {}",
&user.username,
if by_ssh_key {
"SSH Key"
} else {
"Build or Personal Token"
},
);
self.user = Some(Arc::new(user));
self.finished_auth(Auth::Accept).await
} else {
info!("Public key rejected");
self.finished_auth(Auth::Reject).await
}
})
.instrument(span),
)
}
fn data(mut self, channel: ChannelId, data: &[u8], mut session: Session) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "data");
self.input_bytes.extend_from_slice(data);
Box::pin(
capture_errors(async move {
let (commit_hash, packfile_entries) = &*self.build_packfile().await?;
while let Some(frame) = self.codec.decode(&mut self.input_bytes)? {
if frame.command.is_empty() {
session.exit_status_request(channel, 0);
session.eof(channel);
session.close(channel);
return Ok((self, session));
}
match frame.command.as_ref() {
b"command=ls-refs" => {
command::git_upload_pack::ls_refs::handle(
&mut self,
&mut session,
channel,
&frame.metadata,
commit_hash,
)?;
}
b"command=fetch" => {
command::git_upload_pack::fetch::handle(
&mut self,
&mut session,
channel,
&frame.metadata,
packfile_entries,
)?;
}
v => {
error!(
"Client sent unknown command, ignoring command {}",
std::str::from_utf8(v).unwrap_or("invalid utf8")
);
}
}
}
Ok((self, session))
})
.instrument(span),
)
}
fn env_request(
mut self,
_channel: ChannelId,
name: &str,
value: &str,
session: Session,
) -> Self::FutureUnit {
#[allow(clippy::single_match)]
match (name, value) {
("GIT_PROTOCOL", "version=2") => self.is_git_protocol_v2 = true,
_ => {}
}
Box::pin(futures::future::ready(Ok((self, session))))
}
fn shell_request(mut self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "shell_request");
Box::pin(capture_errors(async move {
let user = Arc::clone(self.user()?);
write!(
&mut self.output_bytes,
"Hi there, {}! You've successfully authenticated, but {} does not provide shell access.\r\n",
user.username,
env!("CARGO_PKG_NAME")
)?;
info!("Shell requested, dropping connection");
self.flush(&mut session, channel);
session.close(channel);
Ok((self, session))
}).instrument(span))
}
fn exec_request(
mut self,
channel: ChannelId,
data: &[u8],
mut session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "exec_request");
let data = match std::str::from_utf8(data) {
Ok(data) => data,
Err(e) => {
return Box::pin(capture_errors(futures::future::err(e.into())).instrument(span))
}
};
let args = shlex::split(data);
Box::pin(capture_errors(async move {
if !self.is_git_protocol_v2 {
anyhow::bail!("not git protocol v2");
}
let mut args = args.into_iter().flat_map(Vec::into_iter);
match args.next().as_deref() {
Some("git-upload-pack") => {
let arg = args.next();
if let Some(project) = arg.as_deref()
.filter(|v| *v != "/")
.map(|project| project.trim_start_matches('/').trim_end_matches('/'))
.filter(|project| project.contains('/'))
{
self.project = Some(Arc::from(project.to_string()));
} else {
session.extended_data(channel, 1, CryptoVec::from_slice(indoc::indoc! {b"
\r\nNo project was given in the path part of the SSH URI. A GitLab group and project should be defined in your .cargo/config.toml as follows:
[registries]
my-project = {{ index = \"ssh://domain.to.registry.com/my-group/my-project\" }}\r\n
"}));
session.close(channel);
}
self.write(PktLine::Data(b"version 2\n"))?;
self.write(PktLine::Data(AGENT.as_bytes()))?;
self.write(PktLine::Data(b"ls-refs=unborn\n"))?;
self.write(PktLine::Data(b"fetch=shallow wait-for-done\n"))?;
self.write(PktLine::Data(b"server-option\n"))?;
self.write(PktLine::Data(b"object-info\n"))?;
self.write(PktLine::Flush)?;
self.flush(&mut session, channel);
}
Some("bust-cache") => {
if let Err(e) = command::bust_cache::handle(&mut self, &mut session, channel, args).await {
session.data(
channel,
CryptoVec::from(e.to_string()),
);
session.exit_status_request(channel, 1);
session.close(channel);
}
}
_ => bail!("invalid command"),
}
Ok((self, session))
}).instrument(span))
}
}
async fn capture_errors<T>(
fut: impl Future<Output = Result<T, anyhow::Error>>,
) -> Result<T, anyhow::Error> {
let res = fut.await;
if let Err(e) = &res {
error!("{e}");
}
res
}