🏡 index : ~doyle/rgit.git

author Jordan Doyle <jordan@doyle.la> 2024-02-03 15:19:38.0 +00:00:00
committer Jordan Doyle <jordan@doyle.la> 2024-02-03 15:19:38.0 +00:00:00
commit
239a5d88419462669c475956c457157651dcd1bc [patch]
tree
9fa15f9fc65b5f4c54f832ffc8946d04f4ae81f6
parent
96928a28c708f08e2c64047ee314d77016a394ce
download
239a5d88419462669c475956c457157651dcd1bc.tar.gz

Improve smart_git performance via output streaming



Diff

 src/methods/repo/mod.rs       |   2 --
 src/methods/repo/smart_git.rs | 140 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------
 2 files changed, 108 insertions(+), 34 deletions(-)

diff --git a/src/methods/repo/mod.rs b/src/methods/repo/mod.rs
index 6280480..257b3df 100644
--- a/src/methods/repo/mod.rs
+++ a/src/methods/repo/mod.rs
@@ -77,8 +77,6 @@

    let mut service = match uri_parts.pop() {
        Some("about") => h!(handle_about),
        // TODO: https://man.archlinux.org/man/git-http-backend.1.en
        // TODO: GIT_PROTOCOL
        Some("refs") if uri_parts.last() == Some(&"info") => {
            uri_parts.pop();
            h!(handle_smart_git)
diff --git a/src/methods/repo/smart_git.rs b/src/methods/repo/smart_git.rs
index 7d4463c..de98b30 100644
--- a/src/methods/repo/smart_git.rs
+++ a/src/methods/repo/smart_git.rs
@@ -1,19 +1,25 @@
use std::{io::ErrorKind, path::Path, process::Stdio, str::FromStr};
use std::{io, io::ErrorKind, path::Path, process::Stdio, str::FromStr};

use anyhow::{bail, Context};
use anyhow::{anyhow, Context};
use axum::{
    body::{boxed, Body},
    body::StreamBody,
    extract::BodyStream,
    headers::{HeaderMap, HeaderName, HeaderValue},
    http::{Method, Uri},
    response::Response,
    response::{IntoResponse, Response},
    Extension,
};
use bytes::{Buf, Bytes, BytesMut};
use futures::TryStreamExt;
use httparse::Status;
use tokio::process::Command;
use tokio::{
    io::AsyncReadExt,
    process::{Child, ChildStderr, ChildStdout, Command},
    sync::mpsc,
};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::io::StreamReader;
use tracing::warn;
use tracing::{debug, error, info_span, warn, Instrument};

use crate::{
    methods::repo::{Repository, RepositoryPath, Result},
@@ -28,7 +34,7 @@
    uri: Uri,
    headers: HeaderMap,
    body: BodyStream,
) -> Result<Response> {
) -> Result<impl IntoResponse> {
    let path = extract_path(&uri, &repository)?;

    let mut command = Command::new("git");
@@ -51,35 +57,104 @@
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .stderr(Stdio::piped())
        .kill_on_drop(true)
        .spawn()
        .context("Failed to spawn git http-backend")?;

    {
        let mut body =
            StreamReader::new(body.map_err(|e| std::io::Error::new(ErrorKind::Other, e)));
        let mut stdin = child.stdin.take().context("Stdin already taken")?;

        tokio::io::copy_buf(&mut body, &mut stdin)
    let mut stdout = child.stdout.take().context("Stdout already taken")?;
    let mut stderr = child.stderr.take().context("Stderr already taken")?;
    let mut stdin = child.stdin.take().context("Stdin already taken")?;

    // read request body and forward to stdin
    let mut body = StreamReader::new(body.map_err(|e| std::io::Error::new(ErrorKind::Other, e)));
    tokio::io::copy_buf(&mut body, &mut stdin)
        .await
        .context("Failed to copy bytes from request to command stdin")?;

    // wait for the headers back from git http-backend
    let mut out_buf = BytesMut::with_capacity(1024);
    let headers = loop {
        let n = stdout
            .read_buf(&mut out_buf)
            .await
            .context("Failed to copy bytes from request to command stdin")?;
            .context("Failed to read headers")?;
        if n == 0 {
            break None;
        }

        if let Some((body_offset, headers)) = parse_cgi_headers(&out_buf)? {
            out_buf.advance(body_offset);
            break Some(headers);
        }
    };

    // if the `headers` loop broke with `None`, the `git http-backend` didn't return any parseable
    // headers so there's no reason for us to continue. there may be something in stderr for us
    // though.
    let Some(headers) = headers else {
        print_status(&mut child, &mut stderr).await;
        return Err(anyhow!("Received incomplete response from git http-backend").into());
    };

    // stream the response back to the client
    let (body_send, body_recv) = mpsc::channel(8);
    tokio::spawn(
        forward_response_to_client(out_buf, body_send, stdout, stderr, child)
            .instrument(info_span!("git http-backend reader")),
    );

    Ok((headers, StreamBody::new(ReceiverStream::new(body_recv))))
}

/// Forwards the entirety of `stdout` to `body_send`, printing subprocess stderr and status on

/// completion.

async fn forward_response_to_client(
    mut out_buf: BytesMut,
    body_send: mpsc::Sender<Result<Bytes, io::Error>>,
    mut stdout: ChildStdout,
    mut stderr: ChildStderr,
    mut child: Child,
) {
    loop {
        let (out, mut end) = match stdout.read_buf(&mut out_buf).await {
            Ok(0) => (Ok(out_buf.split().freeze()), true),
            Ok(n) => (Ok(out_buf.split_to(n).freeze()), false),
            Err(e) => (Err(e), true),
        };

        if body_send.send(out).await.is_err() {
            warn!("Receiver went away during git http-backend call");
            end = true;
        }

        if end {
            break;
        }
    }

    let out = child
        .wait_with_output()
        .await
        .context("Failed to read git http-backend response")?;
    let resp = cgi_to_response(&out.stdout)?;

    if !out.stderr.is_empty() {
        warn!(
            "Git returned an error: `{}`",
            String::from_utf8_lossy(&out.stderr)
        );
    print_status(&mut child, &mut stderr).await;
}

/// Prints the exit status of the `git` subprocess.

async fn print_status(child: &mut Child, stderr: &mut ChildStderr) {
    match tokio::try_join!(child.wait(), read_stderr(stderr)) {
        Ok((status, stderr)) if status.success() => {
            debug!(stderr, "git http-backend successfully shutdown")
        }
        Ok((status, stderr)) => error!(stderr, "git http-backend exited with status code {status}"),
        Err(e) => error!("Failed to wait on git http-backend shutdown: {e}"),
    }
}

    Ok(resp)
/// Reads the entirety of stderr for the given handle.

async fn read_stderr(stderr: &mut ChildStderr) -> io::Result<String> {
    let mut stderr_out = Vec::new();
    stderr.read_to_end(&mut stderr_out).await?;
    Ok(String::from_utf8_lossy(&stderr_out).into_owned())
}

/// Extracts a single header (`header`) from the `input` and passes it as `env` to

/// `output`.

fn extract_header(input: &HeaderMap, output: &mut Command, header: &str, env: &str) -> Result<()> {
    if let Some(value) = input.get(header) {
        output.env(env, value.to_str().context("Invalid header")?);
@@ -88,6 +163,7 @@
    Ok(())
}

/// Extract the path from the URL to determine the repository path.

fn extract_path<'a>(uri: &'a Uri, repository: &Path) -> Result<&'a str> {
    let path = uri.path();
    let path = path.strip_prefix('/').unwrap_or(path);
@@ -99,17 +175,17 @@
    }
}

// https://en.wikipedia.org/wiki/Common_Gateway_Interface
pub fn cgi_to_response(buffer: &[u8]) -> Result<Response, anyhow::Error> {
// Intercept headers from the spawned `git http-backend` CGI and rewrite them to
// an `axum::Response`.
pub fn parse_cgi_headers(buffer: &[u8]) -> Result<Option<(usize, Response<()>)>, anyhow::Error> {
    let mut headers = [httparse::EMPTY_HEADER; 10];
    let (body_offset, headers) = match httparse::parse_headers(buffer, &mut headers)? {
        Status::Complete(v) => v,
        Status::Partial => bail!("Git returned a partial response over CGI"),
        Status::Partial => return Ok(None),
    };

    let mut response = Response::new(boxed(Body::from(buffer[body_offset..].to_vec())));
    let mut response = Response::new(());

    // TODO: extract status header
    for header in headers {
        response.headers_mut().insert(
            HeaderName::from_str(header.name)
@@ -131,5 +207,5 @@
        }
    }

    Ok(response)
    Ok(Some((body_offset, response)))
}