🏡 index : ~doyle/scrs.git

Diff

 scrs-serve/Cargo.toml     |  6 +++---
 scrs-serve/src/handler.rs | 24 ++++++++++++++++++------
 scrs-serve/src/main.rs    | 13 +++++++++++++
 3 files changed, 26 insertions(+), 17 deletions(-)

diff --git a/scrs-serve/Cargo.toml b/scrs-serve/Cargo.toml
index f599f0f..9db4a1b 100644
--- a/scrs-serve/Cargo.toml
+++ a/scrs-serve/Cargo.toml
@@ -11,9 +11,9 @@
futures = "0.3"
tokio = { version = "0.2", features = ["full"] }
http = "0.2"
slog = ""
slog-async = ""
slog-term = ""
slog = "2.5"
slog-async = "2.5"
slog-term = "2.6"
thiserror = "1.0"
bytes = "0.5"
httparse = "1.3"
diff --git a/scrs-serve/src/handler.rs b/scrs-serve/src/handler.rs
index 19cf661..e48a9b3 100644
--- a/scrs-serve/src/handler.rs
+++ a/scrs-serve/src/handler.rs
@@ -12,21 +12,21 @@

#[derive(Error, Debug)]
pub enum HandlerError {
    #[error("failed to parse request from client: `{0}`")]
    #[error("Failed to parse request from client: `{0}`")]
    RequestError(#[from] httparse::Error),
    #[error("failed to create response for client: `{0}`")]
    #[error("Failed to create response for client: `{0}`")]
    ResponseError(#[from] http::Error),
    #[error("other side didn't send valid a valid http request")]
    #[error("Other side didn't send valid a valid http request")]
    InvalidHttp,
    #[error("failed to create JSON blob response for client: `{0}`")]
    #[error("Failed to create JSON blob response for client: `{0}`")]
    JsonWriteError(serde_json::Error),
    #[error("i/o failed for client: `{0}`")]
    #[error("Client i/o failure: `{0}`")]
    IoError(#[from] std::io::Error),
    #[error("source attempted to stream data, but the stream no longer exists: `{0}`")]
    #[error("Source attempted to stream data, but the stream no longer exists: `{0}`")]
    StreamDisconnected(#[from] std::sync::mpsc::SendError<bytes::Bytes>),
    #[error("source is already connected to the requested mountpoint")]
    #[error("Source is already connected to the requested mountpoint")]
    StreamAlreadyConnected,
    #[error("source attempted to stream mime-type {} whereas the stream is defined as {}", .actual, .expected)]
    #[error("Source attempted to stream mime-type `{}` whereas the stream is defined as `{}`", .actual, .expected)]
    UnsupportedContentType {
        actual: String,
        expected: mime::Mime,
@@ -78,7 +78,7 @@
                $canonical_reason
            }
        );
        debug!($log, "Response: {}", &resp_head[..resp_head.len() - 2]);
        debug!($log, "Res: {}", &resp_head[..resp_head.len() - 2]);

        $writer.write_all(resp_head.as_bytes()).await?;

@@ -135,7 +135,7 @@
        Request::try_from(parsed)?
    };

    debug!(log, "Request: {} {} {:?}", req.method(), req.uri(), req.version();
    debug!(log, "Req: {} {} {:?}", req.method(), req.uri(), req.version();
        "user-agent" => req.headers().get("user-agent").and_then(|v| v.to_str().ok()));

    let resp = http::Response::builder()
@@ -155,7 +155,7 @@
            write_response!(log, conn, resp);

            let mut handle = stream.listen();
            debug!(log, "Streaming to new listener"; "mountpoint" => "/");
            debug!(log, "New listener connected"; "mountpoint" => "/");

            loop {
                if let Some(v) = handle.subscriber.next().await {
@@ -187,7 +187,7 @@
                write_response!(log, conn, resp);

                let metadata = StreamMetadata::from(req.headers());
                info!(log, "Accepted stream request";
                info!(log, "Source connected to stream";
                    "mountpoint" => "/",
                    "user-agent" => &metadata.user_agent,
                    "content-type" => &metadata.content_type,
diff --git a/scrs-serve/src/main.rs b/scrs-serve/src/main.rs
index f68c793..1385eb8 100644
--- a/scrs-serve/src/main.rs
+++ a/scrs-serve/src/main.rs
@@ -59,19 +59,28 @@

    info!(
        log,
        concat!(
            clap::crate_name!(),
            "/",
            clap::crate_version!(),
            " starting up..."
        )
    );
    info!(
        log,
        "Listening for new connections on {}", config.server.listen_address
    );

    loop {
        let (conn, remote) = listener.accept().await.unwrap();
        let log = log.new(o!("remote" => remote));
        debug!(log, "Accepted connection from {}", remote);
        debug!(log, "Accepted new connection");

        let stream = stream.clone();

        tokio::spawn(async move {
            if let Err(e) = handler::process(conn, stream, log.clone()).await {
                error!(log, "Error handling request: {}", e);
                error!(log, "{}", e);
            }
        });
    }