🏡 index : ~doyle/pisshoff.git

author Jordan Doyle <jordan@doyle.la> 2023-06-26 0:05:18.0 +00:00:00
committer Jordan Doyle <jordan@doyle.la> 2023-06-26 0:51:41.0 +00:00:00
commit
b947760e2257ee96b9e150b4666be386f3628da9 [patch]
tree
8bc932e76dc6bcf18259420376634323b365fffe
parent
c7068ca36462d8c8f3464e874226d3ad763564eb
download
b947760e2257ee96b9e150b4666be386f3628da9.tar.gz

Add timescaledb exporter application



Diff

 Cargo.lock                                               | 441 ++++++++++-
 Cargo.toml                                               |  31 +-
 config.toml                                              |  10 +-
 flake.nix                                                |  84 +-
 pisshoff-server/Cargo.toml                               |  27 +-
 pisshoff-server/config.toml                              |  10 +-
 pisshoff-server/src/audit.rs                             |  76 ++-
 pisshoff-server/src/command.rs                           |  51 +-
 pisshoff-server/src/config.rs                            |  72 ++-
 pisshoff-server/src/main.rs                              | 110 +++-
 pisshoff-server/src/server.rs                            | 620 ++++++++++++++++-
 pisshoff-server/src/state.rs                             |  22 +-
 pisshoff-timescaledb-exporter/Cargo.toml                 |  23 +-
 pisshoff-timescaledb-exporter/config.toml                |   7 +-
 pisshoff-timescaledb-exporter/migrations/V1__initial.sql |  30 +-
 pisshoff-timescaledb-exporter/src/config.rs              |  39 +-
 pisshoff-timescaledb-exporter/src/main.rs                | 156 ++++-
 pisshoff-types/Cargo.toml                                |  12 +-
 pisshoff-types/src/audit.rs                              | 162 ++++-
 pisshoff-types/src/lib.rs                                |   4 +-
 src/audit.rs                                             | 233 +------
 src/command.rs                                           |  51 +-
 src/config.rs                                            |  72 +--
 src/main.rs                                              | 103 +---
 src/server.rs                                            | 613 +----------------
 src/state.rs                                             |  22 +-
 26 files changed, 1931 insertions(+), 1150 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index bc4764f..8645ef6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -22,6 +22,15 @@ dependencies = [
]

[[package]]
name = "aho-corasick"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41"
dependencies = [
 "memchr",
]

[[package]]
name = "anstream"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -77,12 +86,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8"

[[package]]
name = "async-trait"
version = "0.1.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842"
dependencies = [
 "proc-macro2",
 "quote",
 "syn 2.0.20",
]

[[package]]
name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"

[[package]]
name = "base64"
version = "0.21.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d"

[[package]]
name = "base64ct"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -232,7 +258,7 @@ dependencies = [
 "heck",
 "proc-macro2",
 "quote",
 "syn",
 "syn 2.0.20",
]

[[package]]
@@ -311,6 +337,42 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308"

[[package]]
name = "deadpool"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "421fe0f90f2ab22016f32a9881be5134fdd71c65298917084b0c7477cbc3856e"
dependencies = [
 "async-trait",
 "deadpool-runtime",
 "num_cpus",
 "retain_mut",
 "serde",
 "tokio",
]

[[package]]
name = "deadpool-postgres"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "836a24a9d49deefe610b8b60c767a7412e9a931d79a89415cd2d2d71630ca8d7"
dependencies = [
 "deadpool",
 "log",
 "serde",
 "tokio",
 "tokio-postgres",
]

[[package]]
name = "deadpool-runtime"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaa37046cc0f6c3cc6090fbdbf73ef0b8ef4cfcc37f6befc0020f63e8cf121e1"
dependencies = [
 "tokio",
]

[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -384,6 +446,12 @@ dependencies = [
]

[[package]]
name = "fallible-iterator"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"

[[package]]
name = "fastrand"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -403,6 +471,15 @@ dependencies = [
]

[[package]]
name = "form_urlencoded"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652"
dependencies = [
 "percent-encoding",
]

[[package]]
name = "futures"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -458,7 +535,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
 "proc-macro2",
 "quote",
 "syn",
 "syn 2.0.20",
]

[[package]]
@@ -550,6 +627,25 @@ dependencies = [
]

[[package]]
name = "hmac"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
dependencies = [
 "digest 0.10.7",
]

[[package]]
name = "idna"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c"
dependencies = [
 "unicode-bidi",
 "unicode-normalization",
]

[[package]]
name = "indexmap"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -671,6 +767,15 @@ dependencies = [
]

[[package]]
name = "md-5"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca"
dependencies = [
 "digest 0.10.7",
]

[[package]]
name = "md5"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -683,6 +788,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"

[[package]]
name = "memoffset"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4"
dependencies = [
 "autocfg",
]

[[package]]
name = "miniz_oxide"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -703,6 +817,20 @@ dependencies = [
]

[[package]]
name = "nix"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a"
dependencies = [
 "bitflags",
 "cfg-if",
 "libc",
 "memoffset",
 "pin-utils",
 "static_assertions",
]

[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -812,7 +940,7 @@ checksum = "d95f5254224e617595d2cc3cc73ff0a5eaf2637519e25f03388154e9378b6ffa"
dependencies = [
 "base64ct",
 "crypto-mac",
 "hmac",
 "hmac 0.11.0",
 "password-hash",
 "sha2 0.9.9",
]
@@ -827,6 +955,30 @@ dependencies = [
]

[[package]]
name = "percent-encoding"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"

[[package]]
name = "phf"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc"
dependencies = [
 "phf_shared",
]

[[package]]
name = "phf_shared"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b"
dependencies = [
 "siphasher",
]

[[package]]
name = "pin-project-lite"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -839,7 +991,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"

[[package]]
name = "pisshoff"
name = "pisshoff-server"
version = "0.1.0"
dependencies = [
 "anyhow",
@@ -847,13 +999,14 @@ dependencies = [
 "fastrand",
 "futures",
 "itertools",
 "nix",
 "parking_lot",
 "pisshoff-types",
 "serde",
 "serde_json",
 "shlex",
 "thrussh",
 "thrussh-keys",
 "time",
 "tokio",
 "toml",
 "tracing",
@@ -862,12 +1015,75 @@ dependencies = [
]

[[package]]
name = "pisshoff-timescaledb-exporter"
version = "0.1.0"
dependencies = [
 "anyhow",
 "clap",
 "deadpool-postgres",
 "futures",
 "pisshoff-types",
 "refinery",
 "serde",
 "serde_json",
 "tokio",
 "tokio-postgres",
 "tokio-util",
 "toml",
 "tracing",
 "tracing-subscriber",
]

[[package]]
name = "pisshoff-types"
version = "0.1.0"
dependencies = [
 "serde",
 "strum",
 "time",
 "uuid",
]

[[package]]
name = "pkg-config"
version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964"

[[package]]
name = "postgres-protocol"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78b7fa9f396f51dffd61546fd8573ee20592287996568e6175ceb0f8699ad75d"
dependencies = [
 "base64",
 "byteorder",
 "bytes",
 "fallible-iterator",
 "hmac 0.12.1",
 "md-5",
 "memchr",
 "rand",
 "sha2 0.10.7",
 "stringprep",
]

[[package]]
name = "postgres-types"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f028f05971fe20f512bcc679e2c10227e57809a3af86a7606304435bc8896cd6"
dependencies = [
 "bytes",
 "fallible-iterator",
 "postgres-protocol",
 "serde",
 "serde_json",
 "time",
 "uuid",
]

[[package]]
name = "ppv-lite86"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -951,11 +1167,58 @@ dependencies = [
]

[[package]]
name = "refinery"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb0436d0dd7bd8d4fce1e828751fa79742b08e35f27cfea7546f8a322b5ef24"
dependencies = [
 "refinery-core",
 "refinery-macros",
]

[[package]]
name = "refinery-core"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19206547cd047e8f4dfa6b20c30d3ecaf24be05841b6aa0aa926a47a3d0662bb"
dependencies = [
 "async-trait",
 "cfg-if",
 "lazy_static",
 "log",
 "regex",
 "serde",
 "siphasher",
 "thiserror",
 "time",
 "tokio",
 "tokio-postgres",
 "toml",
 "url",
 "walkdir",
]

[[package]]
name = "refinery-macros"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d94d4b9241859ba19eaa5c04c86e782eb3aa0aae2c5868e0cfa90c856e58a174"
dependencies = [
 "proc-macro2",
 "quote",
 "refinery-core",
 "regex",
 "syn 2.0.20",
]

[[package]]
name = "regex"
version = "1.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0ab3ca65655bb1e41f2a8c8cd662eb4fb035e67c3f78da1d61dffe89d07300f"
dependencies = [
 "aho-corasick",
 "memchr",
 "regex-syntax 0.7.2",
]

@@ -981,6 +1244,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78"

[[package]]
name = "retain_mut"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0"

[[package]]
name = "rustix"
version = "0.37.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -995,6 +1264,12 @@ dependencies = [
]

[[package]]
name = "rustversion"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f3208ce4d8448b3f3e7d168a73f5e0c43a61e32930de3bceeccedb388b6bf06"

[[package]]
name = "ryu"
version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1032,7 +1307,7 @@ checksum = "d9735b638ccc51c28bf6914d90a2e9725b377144fc612c49a611fddd1b631d68"
dependencies = [
 "proc-macro2",
 "quote",
 "syn",
 "syn 2.0.20",
]

[[package]]
@@ -1104,6 +1379,12 @@ dependencies = [
]

[[package]]
name = "siphasher"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"

[[package]]
name = "slab"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1129,12 +1410,60 @@ dependencies = [
]

[[package]]
name = "socket2"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877"
dependencies = [
 "libc",
 "windows-sys",
]

[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"

[[package]]
name = "stringprep"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1"
dependencies = [
 "unicode-bidi",
 "unicode-normalization",
]

[[package]]
name = "strsim"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"

[[package]]
name = "strum"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f"
dependencies = [
 "strum_macros",
]

[[package]]
name = "strum_macros"
version = "0.24.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59"
dependencies = [
 "heck",
 "proc-macro2",
 "quote",
 "rustversion",
 "syn 1.0.109",
]

[[package]]
name = "subtle"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1142,6 +1471,17 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"

[[package]]
name = "syn"
version = "1.0.109"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237"
dependencies = [
 "proc-macro2",
 "quote",
 "unicode-ident",
]

[[package]]
name = "syn"
version = "2.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcb8d4cebc40aa517dfb69618fa647a346562e67228e2236ae0042ee6ac14775"
@@ -1168,7 +1508,7 @@ checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f"
dependencies = [
 "proc-macro2",
 "quote",
 "syn",
 "syn 2.0.20",
]

[[package]]
@@ -1216,7 +1556,7 @@ dependencies = [
 "data-encoding",
 "dirs",
 "futures",
 "hmac",
 "hmac 0.11.0",
 "log",
 "md5",
 "num-bigint",
@@ -1274,6 +1614,21 @@ dependencies = [
]

[[package]]
name = "tinyvec"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50"
dependencies = [
 "tinyvec_macros",
]

[[package]]
name = "tinyvec_macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"

[[package]]
name = "tokio"
version = "1.28.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1287,7 +1642,7 @@ dependencies = [
 "parking_lot",
 "pin-project-lite",
 "signal-hook-registry",
 "socket2",
 "socket2 0.4.9",
 "tokio-macros",
 "windows-sys",
]
@@ -1300,7 +1655,31 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
 "proc-macro2",
 "quote",
 "syn",
 "syn 2.0.20",
]

[[package]]
name = "tokio-postgres"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e89f6234aa8fd43779746012fcf53603cdb91fdd8399aa0de868c2d56b6dde1"
dependencies = [
 "async-trait",
 "byteorder",
 "bytes",
 "fallible-iterator",
 "futures-channel",
 "futures-util",
 "log",
 "parking_lot",
 "percent-encoding",
 "phf",
 "pin-project-lite",
 "postgres-protocol",
 "postgres-types",
 "socket2 0.5.3",
 "tokio",
 "tokio-util",
]

[[package]]
@@ -1315,6 +1694,20 @@ dependencies = [
]

[[package]]
name = "tokio-util"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d"
dependencies = [
 "bytes",
 "futures-core",
 "futures-sink",
 "pin-project-lite",
 "tokio",
 "tracing",
]

[[package]]
name = "toml"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1368,7 +1761,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
dependencies = [
 "proc-macro2",
 "quote",
 "syn",
 "syn 2.0.20",
]

[[package]]
@@ -1417,12 +1810,38 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"

[[package]]
name = "unicode-bidi"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460"

[[package]]
name = "unicode-ident"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15811caf2415fb889178633e7724bad2509101cde276048e013b9def5e51fa0"

[[package]]
name = "unicode-normalization"
version = "0.1.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921"
dependencies = [
 "tinyvec",
]

[[package]]
name = "url"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50bff7831e19200a85b17131d085c25d7811bc4e186efdaf54bbd132994a88cb"
dependencies = [
 "form_urlencoded",
 "idna",
 "percent-encoding",
]

[[package]]
name = "utf8parse"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index b666354..bd808b4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,28 +1,9 @@
[package]
name = "pisshoff"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0"
clap = { version = "4.3", features = ["derive", "env", "cargo"] }
futures = "0.3"
parking_lot = "0.12"
fastrand = "1.9"
itertools = "0.10"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
shlex = "1.1"
thrussh = "0.34"
thrussh-keys = "0.22"
time = { version = "0.3", features = ["serde", "formatting"] }
tokio = { version = "1.28", features = ["full"] }
toml = "0.7"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.3", features = ["v4", "serde"] }
[workspace]
members = [
    "pisshoff-server",
    "pisshoff-timescaledb-exporter",
    "pisshoff-types"
]

[patch."crates-io"]
thrussh = { git = "ssh://git@github.com/JordanForks/thrussh" }
diff --git a/config.toml b/config.toml
deleted file mode 100644
index b8c834e..0000000
--- a/config.toml
+++ /dev/null
@@ -1,10 +0,0 @@
# Address for the server to listen on.
listen-address = "127.0.0.1:2233"

# The probability that an authentication attempt will succeed, once a given password
# has been accepted once - it will be accepted for the rest of the lifetime of the
# instance.
access-probability = 0.2

# Path of the file to write audit logs to.
audit-output-file = "audit.jsonl"
diff --git a/flake.nix b/flake.nix
index 4d35e92..7cb88b1 100644
--- a/flake.nix
+++ b/flake.nix
@@ -14,33 +14,34 @@
      {
        formatter = nixpkgs.legacyPackages."${system}".nixpkgs-fmt;

        defaultPackage = naersk-lib.buildPackage {
        packages.default = naersk-lib.buildPackage {
          src = ./.;
          nativeBuildInputs = with pkgs; [ pkg-config ];
          buildInputs = with pkgs; [ libsodium ];
        };
        devShell = with pkgs; mkShell {

        devShells.default = with pkgs; mkShell {
          buildInputs = [ cargo rustc rustfmt pre-commit rustPackages.clippy ];
          RUST_SRC_PATH = rustPlatform.rustLibSrc;
        };

        nixosModules.default = { config, lib, pkgs, ... }:
        nixosModules.pisshoff-server = { config, lib, pkgs, ... }:
          with lib;
          let
            cfg = config.services.pisshoff;
            cfg = config.services.pisshoff-server;
          in
          {
            options.services.pisshoff = {
              enable = mkEnableOption "pisshoff";
            options.services.pisshoff-server = {
              enable = mkEnableOption "pisshoff-server";
              settings = mkOption {
                type = (pkgs.formats.toml { }).type;
                default = { };
                description = "Specify the configuration for pisshoff in Nix";
                description = "Specify the configuration for pisshoff-server in Nix";
              };
            };

            config = mkIf cfg.enable {
              systemd.services.pisshoff = {
              systemd.services.pisshoff-server = {
                enable = true;
                wantedBy = [ "multi-user.target" ];
                after = [ "network-online.target" ];
@@ -51,7 +52,7 @@
                  in
                  {
                    Type = "exec";
                    ExecStart = "${self.defaultPackage."${system}"}/bin/pisshoff -c \"${conf}\"";
                    ExecStart = "${self.packages."${system}".default}/bin/pisshoff-server -c \"${conf}\"";
                    ExecReload = "${pkgs.coreutils}/bin/kill -HUP $MAINPID";
                    Restart = "on-failure";

@@ -84,7 +85,7 @@
                  };
              };

              services.logrotate.settings.pisshoff = {
              services.logrotate.settings.pisshoff-server = {
                files = "/var/log/pisshoff/audit.log";
                rotate = 31;
                frequency = "daily";
@@ -96,5 +97,68 @@
              };
            };
          };

        nixosModules.pisshoff-timescaledb-exporter = { config, lib, pkgs, ... }:
          with lib;
          let
            cfg = config.services.pisshoff-timescaledb-exporter;
          in
          {
            options.services.pisshoff-timescaledb-exporter = {
              enable = mkEnableOption "pisshoff-timescaledb-exporter";
              settings = mkOption {
                type = (pkgs.formats.toml { }).type;
                default = { };
                description = "Specify the configuration for pisshoff-timescaledb-exporter in Nix";
              };
            };

            config = mkIf cfg.enable {
              systemd.services.pisshoff-timescaledb-exporter = {
                enable = true;
                wantedBy = [ "multi-user.target" ];
                after = [ "network-online.target" ];
                serviceConfig =
                  let
                    format = pkgs.formats.toml { };
                    conf = format.generate "pisshoff.toml" cfg.settings;
                  in
                  {
                    Type = "exec";
                    ExecStart = "${self.packages."${system}".default}/bin/pisshoff-timescaledb-exporter -c \"${conf}\"";
                    ExecReload = "${pkgs.coreutils}/bin/kill -HUP $MAINPID";
                    Restart = "on-failure";

                    RuntimeDirectory = "pisshoff-timescaledb-exporter";
                    LogsDirectory = "pisshoff-timescaledb-exporter";
                    CapabilityBoundingSet = "";
                    NoNewPrivileges = true;
                    PrivateDevices = true;
                    PrivateTmp = true;
                    PrivateUsers = true;
                    PrivateMounts = true;
                    ProtectHome = true;
                    ProtectClock = true;
                    ProtectProc = "invisible";
                    ProcSubset = "pid";
                    ProtectKernelLogs = true;
                    ProtectKernelModules = true;
                    ProtectKernelTunables = true;
                    ProtectControlGroups = true;
                    ProtectHostname = true;
                    ProtectSystem = "strict";
                    RestrictSUIDSGID = true;
                    RestrictRealtime = true;
                    RestrictNamespaces = true;
                    LockPersonality = true;
                    RemoveIPC = true;
                    MemoryDenyWriteExecute = true;
                    DynamicUser = true;
                    RestrictAddressFamilies = [ "AF_UNIX" "AF_INET" "AF_INET6" ];
                    SystemCallFilter = [ "@system-service" "~@privileged" ];
                  };
              };
            };
          };
      });
}
diff --git a/pisshoff-server/Cargo.toml b/pisshoff-server/Cargo.toml
new file mode 100644
index 0000000..11445b6
--- /dev/null
+++ b/pisshoff-server/Cargo.toml
@@ -0,0 +1,27 @@
[package]
name = "pisshoff-server"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
pisshoff-types = { path = "../pisshoff-types" }

anyhow = "1.0"
clap = { version = "4.3", features = ["derive", "env", "cargo"] }
futures = "0.3"
parking_lot = "0.12"
fastrand = "1.9"
itertools = "0.10"
nix = { version = "0.26", features = ["hostname"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
shlex = "1.1"
thrussh = "0.34"
thrussh-keys = "0.22"
tokio = { version = "1.28", features = ["full"] }
toml = "0.7"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.3", features = ["v4", "serde"] }
diff --git a/pisshoff-server/config.toml b/pisshoff-server/config.toml
new file mode 100644
index 0000000..b8c834e
--- /dev/null
+++ b/pisshoff-server/config.toml
@@ -0,0 +1,10 @@
# Address for the server to listen on.
listen-address = "127.0.0.1:2233"

# The probability that an authentication attempt will succeed, once a given password
# has been accepted once - it will be accepted for the rest of the lifetime of the
# instance.
access-probability = 0.2

# Path of the file to write audit logs to.
audit-output-file = "audit.jsonl"
diff --git a/pisshoff-server/src/audit.rs b/pisshoff-server/src/audit.rs
new file mode 100644
index 0000000..7ce2055
--- /dev/null
+++ b/pisshoff-server/src/audit.rs
@@ -0,0 +1,76 @@
use crate::config::Config;
pub use pisshoff_types::audit::*;
use std::{io::ErrorKind, sync::Arc, time::Duration};
use tokio::{
    fs::OpenOptions,
    io::{AsyncWriteExt, BufWriter},
    sync::{oneshot, watch},
    task::JoinHandle,
};
use tracing::{debug, info};

pub fn start_audit_writer(
    config: Arc<Config>,
    mut reload: watch::Receiver<()>,
    mut shutdown_recv: oneshot::Receiver<()>,
) -> (
    tokio::sync::mpsc::UnboundedSender<AuditLog>,
    JoinHandle<Result<(), std::io::Error>>,
) {
    let (send, mut recv) = tokio::sync::mpsc::unbounded_channel();

    let handle = tokio::spawn(async move {
        let open_writer = || async {
            let file = OpenOptions::default()
                .create(true)
                .append(true)
                .open(&config.audit_output_file)
                .await?;
            Ok::<_, std::io::Error>(BufWriter::new(file))
        };

        let mut writer = open_writer().await?;
        let mut shutdown = false;

        while !shutdown {
            tokio::select! {
                log = recv.recv() => {
                    match log {
                        Some(log) => {
                            let log = serde_json::to_vec(&log)
                                .map_err(|e| std::io::Error::new(ErrorKind::Other, e))?;
                            writer.write_all(&log).await?;
                            writer.write_all("\n".as_bytes()).await?;
                        }
                        None => {
                            shutdown = true;
                        }
                    }
                }
                _ = &mut shutdown_recv => {
                    shutdown = true;
                }
                _ = tokio::time::sleep(Duration::from_secs(5)), if !writer.buffer().is_empty() => {
                    debug!("Flushing audits to disk");
                    writer.flush().await?;
                }
                Ok(()) = reload.changed() => {
                    info!("Flushing audits to disk");
                    writer.flush().await?;

                    info!("Reopening handle to log file");
                    writer = open_writer().await?;

                    info!("Successfully re-opened log file");
                }
                else => break,
            }
        }

        writer.flush().await?;

        Ok(())
    });

    (send, handle)
}
diff --git a/pisshoff-server/src/command.rs b/pisshoff-server/src/command.rs
new file mode 100644
index 0000000..7929f7a
--- /dev/null
+++ b/pisshoff-server/src/command.rs
@@ -0,0 +1,51 @@
use itertools::Itertools;
use std::{f32, str::FromStr, time::Duration};
use thrussh::{server::Session, ChannelId};

pub async fn run_command(args: &[String], channel: ChannelId, session: &mut Session) {
    let Some(command) = args.get(0) else {
        return;
    };

    match command.as_str() {
        "echo" => {
            session.data(
                channel,
                format!("{}\n", args.iter().skip(1).join(" ")).into(),
            );
        }
        "whoami" => {
            // TODO: grab "logged in" user
            session.data(channel, "root\n".to_string().into());
        }
        "pwd" => {
            // TODO: mock FHS
            session.data(channel, "/root\n".to_string().into());
        }
        "ls" => {
            // pretend /root is empty until we mock the FHS
        }
        "exit" => {
            let exit_status = args
                .get(1)
                .map(String::as_str)
                .map_or(Ok(0), u32::from_str)
                .unwrap_or(2);

            session.exit_status_request(channel, exit_status);
            session.close(channel);
        }
        "sleep" => {
            if let Some(Ok(secs)) = args.get(1).map(String::as_str).map(f32::from_str) {
                tokio::time::sleep(Duration::from_secs_f32(secs)).await;
            }
        }
        other => {
            // TODO: fix stderr displaying out of order
            session.data(
                channel,
                format!("bash: {other}: command not found\n").into(),
            );
        }
    }
}
diff --git a/pisshoff-server/src/config.rs b/pisshoff-server/src/config.rs
new file mode 100644
index 0000000..753d8eb
--- /dev/null
+++ b/pisshoff-server/src/config.rs
@@ -0,0 +1,72 @@
use clap::Parser;
use serde::{de::DeserializeOwned, Deserialize};
use std::path::PathBuf;
use std::sync::Arc;
use std::{io::ErrorKind, net::SocketAddr};

/// Parser for command line arguments, these arguments can also be passed via capitalised env vars
/// of the same name.
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
pub struct Args {
    #[arg(short, long, env, value_parser = load_config::<Config>)]
    pub config: Arc<Config>,
    #[arg(short, long, action = clap::ArgAction::Count)]
    pub verbose: u8,
}

impl Args {
    pub fn verbosity(&self) -> &'static str {
        match self.verbose {
            0 => "info",
            1 => "debug,thrussh=info",
            2 => "debug",
            _ => "trace",
        }
    }
}

#[derive(Deserialize, Clone)]
#[serde(rename_all = "kebab-case")]
pub struct Config {
    /// Address for the server to listen on.
    #[serde(default = "Config::default_listen_address")]
    pub listen_address: SocketAddr,
    /// The probability that an authentication attempt will succeed, once a given password
    /// has been accepted once - it will be accepted for the rest of the lifetime of the
    /// instance.
    #[serde(default = "Config::default_access_probability")]
    pub access_probability: f64,
    /// Path of the file to write audit logs to.
    #[serde(default = "Config::default_audit_output_file")]
    pub audit_output_file: PathBuf,
    /// The server ID string sent at the beginning of the SSH connection.
    #[serde(default = "Config::default_server_id")]
    pub server_id: String,
}

impl Config {
    fn default_listen_address() -> SocketAddr {
        "0.0.0.0:22".parse().unwrap()
    }

    fn default_access_probability() -> f64 {
        0.2
    }

    fn default_audit_output_file() -> PathBuf {
        "/var/log/pisshoff/audit.log".parse().unwrap()
    }

    fn default_server_id() -> String {
        "SSH-2.0-OpenSSH_9.3".to_string()
    }
}

fn load_config<T: DeserializeOwned>(path: &str) -> Result<Arc<T>, std::io::Error> {
    let file = std::fs::read_to_string(path)?;

    toml::from_str(&file)
        .map(Arc::new)
        .map_err(|e| std::io::Error::new(ErrorKind::Other, e))
}
diff --git a/pisshoff-server/src/main.rs b/pisshoff-server/src/main.rs
new file mode 100644
index 0000000..379a4c4
--- /dev/null
+++ b/pisshoff-server/src/main.rs
@@ -0,0 +1,110 @@
#![deny(clippy::pedantic)]
#![allow(clippy::module_name_repetitions)]

use crate::{config::Args, server::Server};
use anyhow::anyhow;
use clap::Parser;
use futures::FutureExt;
use std::sync::Arc;
use thrussh::MethodSet;
use tokio::{
    signal::unix::SignalKind,
    sync::{oneshot, watch},
};
use tracing::{error, info};
use tracing_subscriber::EnvFilter;

mod audit;
mod command;
mod config;
mod server;
mod state;

#[tokio::main]
async fn main() {
    if let Err(e) = run().await {
        error!("Failed to run {}: {}", env!("CARGO_CRATE_NAME"), e);
        std::process::exit(1);
    }
}

async fn run() -> anyhow::Result<()> {
    let args = Args::parse();

    std::env::set_var("RUST_LOG", args.verbosity());

    tracing_subscriber::fmt()
        .with_env_filter(EnvFilter::from_default_env())
        .init();

    info!(
        "{} listening on {}",
        env!("CARGO_CRATE_NAME"),
        args.config.listen_address
    );

    let hostname = Box::leak(
        nix::unistd::gethostname()?
            .into_string()
            .map_err(|_| anyhow!("invalid hostname"))?
            .into_boxed_str(),
    );
    let keys = vec![thrussh_keys::key::KeyPair::generate_ed25519().unwrap()];

    let thrussh_config = Arc::new(thrussh::server::Config {
        server_id: args.config.server_id.to_string(),
        methods: MethodSet::PASSWORD | MethodSet::PUBLICKEY | MethodSet::KEYBOARD_INTERACTIVE,
        keys,
        auth_rejection_time: std::time::Duration::from_secs(1),
        ..thrussh::server::Config::default()
    });

    let (reload_send, reload_recv) = watch::channel(());
    let (shutdown_send, shutdown_recv) = oneshot::channel();

    let (audit_send, audit_handle) =
        audit::start_audit_writer(args.config.clone(), reload_recv, shutdown_recv);
    let mut audit_handle = audit_handle.fuse();

    let server = Server::new(hostname, args.config.clone(), audit_send);
    let listen_address = args.config.listen_address.to_string();

    // TODO: needs clean shutdowns on clients
    let fut = thrussh::server::run(thrussh_config, &listen_address, server);

    let shutdown_watcher = watch_for_shutdown(shutdown_send);
    let reload_watcher = watch_for_reloads(reload_send);

    tokio::select! {
        res = fut => res?,
        res = &mut audit_handle => res??,
        res = shutdown_watcher => res?,
        res = reload_watcher => res?,
    }

    info!("Finishing audit log writes");
    audit_handle.await??;
    info!("Audit log writes finished");

    Ok(())
}

async fn watch_for_shutdown(send: oneshot::Sender<()>) -> Result<(), anyhow::Error> {
    tokio::signal::ctrl_c().await?;
    info!("Received ctrl-c, initiating shutdown");

    let _res = send.send(());

    Ok(())
}

async fn watch_for_reloads(send: watch::Sender<()>) -> Result<(), anyhow::Error> {
    let mut signal = tokio::signal::unix::signal(SignalKind::hangup())?;

    while let Some(()) = signal.recv().await {
        info!("Received SIGHUP, broadcasting reload");
        let _res = send.send(());
    }

    Ok(())
}
diff --git a/pisshoff-server/src/server.rs b/pisshoff-server/src/server.rs
new file mode 100644
index 0000000..5e21685
--- /dev/null
+++ b/pisshoff-server/src/server.rs
@@ -0,0 +1,620 @@
use crate::audit::{
    ExecCommandEvent, SignalEvent, SubsystemRequestEvent, TcpIpForwardEvent, WindowAdjustedEvent,
    WindowChangeRequestEvent,
};
use crate::{
    audit::{
        AuditLog, AuditLogAction, LoginAttemptEvent, OpenDirectTcpIpEvent, OpenX11Event,
        PtyRequestEvent, X11RequestEvent,
    },
    command::run_command,
    config::Config,
    state::State,
};
use futures::{
    future::{BoxFuture, InspectErr},
    FutureExt, TryFutureExt,
};
use std::{
    borrow::Cow,
    future::Future,
    net::SocketAddr,
    pin::Pin,
    sync::Arc,
    task::{Context, Poll},
};
use thrussh::{
    server::{Auth, Response, Session},
    ChannelId, Pty, Sig,
};
use thrussh_keys::key::PublicKey;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, error, info, info_span, instrument::Instrumented, Instrument, Span};

pub static KEYBOARD_INTERACTIVE_PROMPT: &[(Cow<'static, str>, bool)] =
    &[(Cow::Borrowed("Password: "), false)];
pub const SHELL_PROMPT: &str = "bash-5.1$ ";

#[derive(Clone)]
pub struct Server {
    config: Arc<Config>,
    state: Arc<State>,
    hostname: &'static str,
    audit_send: UnboundedSender<AuditLog>,
}

impl Server {
    pub fn new(
        hostname: &'static str,
        config: Arc<Config>,
        audit_send: UnboundedSender<AuditLog>,
    ) -> Self {
        Self {
            config,
            hostname,
            state: Arc::new(State::default()),
            audit_send,
        }
    }
}

impl thrussh::server::Server for Server {
    type Handler = Connection;

    fn new(&mut self, peer_addr: Option<SocketAddr>) -> Self::Handler {
        let connection_id = uuid::Uuid::new_v4();

        Connection {
            span: info_span!("connection", ?peer_addr, %connection_id),
            server: self.clone(),
            audit_log: AuditLog {
                connection_id,
                host: Cow::Borrowed(self.hostname),
                peer_address: peer_addr,
                ..AuditLog::default()
            },
        }
    }
}

pub struct Connection {
    span: Span,
    server: Server,
    audit_log: AuditLog,
}

impl Connection {
    fn try_login(&mut self, user: &str, password: &str) -> bool {
        let res = if self
            .server
            .state
            .previously_accepted_passwords
            .seen(password)
        {
            info!(user, password, "Accepted login due to it being used before");
            true
        } else if fastrand::f64() <= self.server.config.access_probability {
            info!(user, password, "Accepted login randomly");
            self.server
                .state
                .previously_accepted_passwords
                .store(password);
            true
        } else {
            info!(?user, ?password, "Rejected login");
            false
        };

        self.audit_log.push_action(AuditLogAction::LoginAttempt(
            LoginAttemptEvent::UsernamePassword {
                username: Box::from(user),
                password: Box::from(password),
            },
        ));

        res
    }
}

impl thrussh::server::Handler for Connection {
    type Error = anyhow::Error;
    type FutureAuth = HandlerFuture<Auth>;
    type FutureUnit = HandlerFuture<Session>;
    type FutureBool =
        ServerFuture<Self::Error, BoxFuture<'static, Result<(Self, Session, bool), Self::Error>>>;

    fn finished_auth(self, auth: Auth) -> Self::FutureAuth {
        let span = info_span!(parent: &self.span, "finished_auth");
        futures::future::ok((self, auth)).boxed().wrap(span)
    }

    fn finished_bool(self, b: bool, session: Session) -> Self::FutureBool {
        let span = info_span!(parent: &self.span, "finished_bool");
        let _entered = span.enter();

        futures::future::ok((self, session, b))
            .boxed()
            .wrap(Span::current())
    }

    fn finished(self, session: Session) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "finished");
        let _entered = span.enter();

        futures::future::ok((self, session))
            .boxed()
            .wrap(Span::current())
    }

    fn auth_none(self, _user: &str) -> Self::FutureAuth {
        let span = info_span!(parent: &self.span, "auth_none");

        self.finished_auth(Auth::UnsupportedMethod)
            .boxed()
            .wrap(span)
    }

    fn auth_password(mut self, user: &str, password: &str) -> Self::FutureAuth {
        let span = info_span!(parent: &self.span, "auth_password");
        let _entered = span.enter();

        let res = if self.try_login(user, password) {
            Auth::Accept
        } else {
            Auth::Reject
        };

        self.finished_auth(res)
    }

    fn auth_publickey(mut self, _user: &str, public_key: &PublicKey) -> Self::FutureAuth {
        let span = info_span!(parent: &self.span, "auth_publickey");
        let _entered = span.enter();

        let kind = public_key.name();
        let fingerprint = public_key.fingerprint();

        self.audit_log
            .push_action(AuditLogAction::LoginAttempt(LoginAttemptEvent::PublicKey {
                kind: Cow::Borrowed(kind),
                fingerprint: Box::from(fingerprint),
            }));

        self.finished_auth(Auth::Reject)
            .boxed()
            .wrap(Span::current())
    }

    fn auth_keyboard_interactive(
        mut self,
        user: &str,
        _submethods: &str,
        mut response: Option<Response>,
    ) -> Self::FutureAuth {
        let span = info_span!(parent: &self.span, "auth_keyboard_interactive");
        let _entered = span.enter();

        let result = if let Some(password) = response
            .as_mut()
            .and_then(Response::next)
            .map(String::from_utf8_lossy)
        {
            if self.try_login(user, password.as_ref()) {
                Auth::Accept
            } else {
                Auth::Reject
            }
        } else {
            debug!("Client is attempting keyboard-interactive, obliging");

            Auth::Partial {
                name: "".into(),
                instructions: "".into(),
                prompts: KEYBOARD_INTERACTIVE_PROMPT.into(),
            }
        };

        self.finished_auth(result)
    }

    fn channel_close(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "channel_close");
        let _entered = span.enter();

        session.channel_success(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn channel_eof(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "channel_eof");
        let _entered = span.enter();

        info!("In here");

        session.channel_success(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn channel_open_session(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "channel_open_session");
        let _entered = span.enter();

        session.channel_success(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn channel_open_x11(
        mut self,
        channel: ChannelId,
        originator_address: &str,
        originator_port: u32,
        mut session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "channel_open_x11");
        let _entered = span.enter();

        self.audit_log
            .push_action(AuditLogAction::OpenX11(OpenX11Event {
                originator_address: Box::from(originator_address),
                originator_port,
            }));

        session.channel_failure(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn channel_open_direct_tcpip(
        mut self,
        channel: ChannelId,
        host_to_connect: &str,
        port_to_connect: u32,
        originator_address: &str,
        originator_port: u32,
        mut session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "channel_open_direct_tcpip");
        let _entered = span.enter();

        self.audit_log
            .push_action(AuditLogAction::OpenDirectTcpIp(OpenDirectTcpIpEvent {
                host_to_connect: Box::from(host_to_connect),
                port_to_connect,
                originator_address: Box::from(originator_address),
                originator_port,
            }));

        session.channel_failure(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn data(mut self, channel: ChannelId, data: &[u8], mut session: Session) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "data");
        let _entered = span.enter();

        let data = shlex::split(String::from_utf8_lossy(data).as_ref());

        async move {
            if let Some(args) = data {
                run_command(&args, channel, &mut session).await;
                self.audit_log
                    .push_action(AuditLogAction::ExecCommand(ExecCommandEvent {
                        args: Box::from(args),
                    }));
            }

            session.data(channel, SHELL_PROMPT.to_string().into());
            self.finished(session).await
        }
        .boxed()
        .wrap(Span::current())
    }

    fn extended_data(
        self,
        _channel: ChannelId,
        _code: u32,
        _data: &[u8],
        session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "extended_data");
        let _entered = span.enter();

        self.finished(session).boxed().wrap(Span::current())
    }

    fn window_adjusted(
        mut self,
        _channel: ChannelId,
        new_window_size: usize,
        session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "window_adjusted");
        let _entered = span.enter();

        self.audit_log
            .push_action(AuditLogAction::WindowAdjusted(WindowAdjustedEvent {
                new_size: new_window_size,
            }));

        self.finished(session).boxed().wrap(Span::current())
    }

    fn adjust_window(&mut self, _channel: ChannelId, current: u32) -> u32 {
        let span = info_span!(parent: &self.span, "adjust_window");
        let _entered = span.enter();

        current
    }

    fn pty_request(
        mut self,
        channel: ChannelId,
        term: &str,
        col_width: u32,
        row_height: u32,
        pix_width: u32,
        pix_height: u32,
        modes: &[(Pty, u32)],
        mut session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "pty_request");
        let _entered = span.enter();

        self.audit_log
            .push_action(AuditLogAction::PtyRequest(PtyRequestEvent {
                term: Box::from(term),
                col_width,
                row_height,
                pix_width,
                pix_height,
                modes: Box::from(
                    modes
                        .iter()
                        .copied()
                        .map(|(pty, val)| (pty as u8, val))
                        .collect::<Vec<_>>(),
                ),
            }));

        session.channel_failure(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn x11_request(
        mut self,
        channel: ChannelId,
        single_connection: bool,
        x11_auth_protocol: &str,
        x11_auth_cookie: &str,
        x11_screen_number: u32,
        mut session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "x11_request");
        let _entered = span.enter();

        self.audit_log
            .push_action(AuditLogAction::X11Request(X11RequestEvent {
                single_connection,
                x11_auth_protocol: Box::from(x11_auth_protocol),
                x11_auth_cookie: Box::from(x11_auth_cookie),
                x11_screen_number,
            }));

        session.channel_failure(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn env_request(
        mut self,
        channel: ChannelId,
        variable_name: &str,
        variable_value: &str,
        mut session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "env_request");
        let _entered = span.enter();

        self.audit_log
            .environment_variables
            .push((Box::from(variable_name), Box::from(variable_value)));

        session.channel_success(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn shell_request(mut self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "shell_request");
        let _entered = span.enter();

        self.audit_log.push_action(AuditLogAction::ShellRequested);

        session.data(channel, SHELL_PROMPT.to_string().into());

        session.channel_success(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn exec_request(
        mut self,
        channel: ChannelId,
        data: &[u8],
        mut session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "exec_request");
        let _entered = span.enter();

        let data = shlex::split(String::from_utf8_lossy(data).as_ref());

        async move {
            if let Some(args) = data {
                run_command(&args, channel, &mut session).await;
                self.audit_log
                    .push_action(AuditLogAction::ExecCommand(ExecCommandEvent {
                        args: Box::from(args),
                    }));

                session.channel_success(channel);
            } else {
                session.channel_failure(channel);
            }

            self.finished(session).await
        }
        .boxed()
        .wrap(Span::current())
    }

    fn subsystem_request(
        mut self,
        channel: ChannelId,
        name: &str,
        mut session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "subsystem_request");
        let _entered = span.enter();

        self.audit_log
            .push_action(AuditLogAction::SubsystemRequest(SubsystemRequestEvent {
                name: Box::from(name),
            }));

        session.channel_failure(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn window_change_request(
        mut self,
        channel: ChannelId,
        col_width: u32,
        row_height: u32,
        pix_width: u32,
        pix_height: u32,
        mut session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "window_change_request");
        let _entered = span.enter();

        self.audit_log
            .push_action(AuditLogAction::WindowChangeRequest(
                WindowChangeRequestEvent {
                    col_width,
                    row_height,
                    pix_width,
                    pix_height,
                },
            ));

        session.channel_success(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn signal(
        mut self,
        _channel: ChannelId,
        signal_name: Sig,
        session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "signal");
        let _entered = span.enter();

        self.audit_log
            .push_action(AuditLogAction::Signal(SignalEvent {
                name: format!("{signal_name:?}").into(),
            }));

        self.finished(session).boxed().wrap(Span::current())
    }

    fn tcpip_forward(mut self, address: &str, port: u32, session: Session) -> Self::FutureBool {
        let span = info_span!(parent: &self.span, "tcpip_forward");
        let _entered = span.enter();

        self.audit_log
            .push_action(AuditLogAction::TcpIpForward(TcpIpForwardEvent {
                address: Box::from(address),
                port,
            }));

        self.finished_bool(false, session)
            .boxed()
            .wrap(Span::current())
    }

    fn cancel_tcpip_forward(
        mut self,
        address: &str,
        port: u32,
        session: Session,
    ) -> Self::FutureBool {
        let span = info_span!(parent: &self.span, "cancel_tcpip_forward");
        let _entered = span.enter();

        self.audit_log
            .push_action(AuditLogAction::CancelTcpIpForward(TcpIpForwardEvent {
                address: Box::from(address),
                port,
            }));

        self.finished_bool(false, session)
            .boxed()
            .wrap(Span::current())
    }
}

impl Drop for Connection {
    fn drop(&mut self) {
        let span = info_span!(parent: &self.span, "drop");
        let _entered = span.enter();

        info!("Connection closed");

        let _res = self
            .server
            .audit_send
            .send(std::mem::take(&mut self.audit_log));
    }
}

type HandlerResult<T> = Result<T, <Connection as thrussh::server::Handler>::Error>;
type HandlerFuture<T> = ServerFuture<
    <Connection as thrussh::server::Handler>::Error,
    BoxFuture<'static, HandlerResult<(Connection, T)>>,
>;

/// Wraps a future, providing logging and instrumentation. This provides a newtype over the future
/// (`ServerFuture`) in order to enforce usage within the `thrussh::server::Handler` impl.
pub trait WrapFuture: Sized {
    type Ok;
    type Err;

    fn wrap(self, span: Span) -> ServerFuture<Self::Err, Self>;
}

impl<T, F: Future<Output = Result<T, anyhow::Error>>> WrapFuture for F {
    type Ok = T;
    type Err = anyhow::Error;

    fn wrap(self, span: Span) -> ServerFuture<Self::Err, Self> {
        ServerFuture(
            self.inspect_err(log_err as fn(&anyhow::Error))
                .instrument(span),
        )
    }
}

/// Logs an error from a future result.
fn log_err(e: &anyhow::Error) {
    error!("Connection closed due to: {}", e);
}

/// A wrapped future, providing logging ad instrumentation.
#[allow(clippy::type_complexity)]
pub struct ServerFuture<E, F>(Instrumented<InspectErr<F, fn(&E)>>);

impl<T, E, F: Future<Output = Result<T, E>> + Unpin> Future for ServerFuture<E, F> {
    type Output = F::Output;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        Pin::new(&mut self.0).poll(cx)
    }
}
diff --git a/pisshoff-server/src/state.rs b/pisshoff-server/src/state.rs
new file mode 100644
index 0000000..0ade91a
--- /dev/null
+++ b/pisshoff-server/src/state.rs
@@ -0,0 +1,22 @@
use parking_lot::RwLock;
use std::collections::HashSet;

#[derive(Default)]
pub struct State {
    /// A list of passwords that have previously been accepted, and will forever be accepted
    /// to further attract the bear.
    pub previously_accepted_passwords: StoredPasswords,
}

#[derive(Default)]
pub struct StoredPasswords(RwLock<HashSet<Box<str>>>);

impl StoredPasswords {
    pub fn seen(&self, password: &str) -> bool {
        self.0.read().contains(password)
    }

    pub fn store(&self, password: &str) -> bool {
        self.0.write().insert(Box::from(password.to_string()))
    }
}
diff --git a/pisshoff-timescaledb-exporter/Cargo.toml b/pisshoff-timescaledb-exporter/Cargo.toml
new file mode 100644
index 0000000..fba62ce
--- /dev/null
+++ b/pisshoff-timescaledb-exporter/Cargo.toml
@@ -0,0 +1,23 @@
[package]
name = "pisshoff-timescaledb-exporter"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
pisshoff-types = { path = "../pisshoff-types" }

anyhow = "1.0"
clap = { version = "4.3", features = ["derive", "env", "cargo"] }
deadpool-postgres = { version = "0.10", features = ["rt_tokio_1", "serde"] }
futures = "0.3"
refinery = { version = "0.8", features = ["tokio-postgres"] }
tokio = { version = "1.28", features = ["full"] }
tokio-util = { version = "0.7", features = ["codec"] }
tokio-postgres = { version = "0.7", features = ["with-time-0_3", "with-uuid-1", "with-serde_json-1"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
toml = "0.7"
diff --git a/pisshoff-timescaledb-exporter/config.toml b/pisshoff-timescaledb-exporter/config.toml
new file mode 100644
index 0000000..be607c3
--- /dev/null
+++ b/pisshoff-timescaledb-exporter/config.toml
@@ -0,0 +1,7 @@
socket-path = "test.sock"

[pg]
user = "postgres"
dbname = "pisshoff"
host = "127.0.0.1"
port = 64601
diff --git a/pisshoff-timescaledb-exporter/migrations/V1__initial.sql b/pisshoff-timescaledb-exporter/migrations/V1__initial.sql
new file mode 100644
index 0000000..9d0eea6
--- /dev/null
+++ b/pisshoff-timescaledb-exporter/migrations/V1__initial.sql
@@ -0,0 +1,30 @@
CREATE TABLE audit (
    timestamp TIMESTAMPTZ NOT NULL,
    connection_id UUID NOT NULL,
    peer_address TEXT NOT NULL,
    host TEXT NOT NULL,
    UNIQUE(timestamp)
);

SELECT create_hypertable('audit', 'timestamp');

CREATE TABLE audit_environment_variables (
    connection_id uuid NOT NULL,
    name TEXT NOT NULL,
    value TEXT NOT NULL
);

CREATE INDEX audit_environment_variables_connection_id ON audit_environment_variables USING HASH (connection_id);
CREATE INDEX audit_environment_variables_name ON audit_environment_variables USING HASH (name);

CREATE TABLE audit_events (
    timestamp TIMESTAMPTZ NOT NULL,
    connection_id UUID NOT NULL,
    type TEXT NOT NULL,
    content JSONB
);

SELECT create_hypertable('audit_events', 'timestamp');

CREATE INDEX audit_events_connection_id ON audit_events USING HASH (connection_id);
CREATE INDEX audit_events_type ON audit_events USING HASH (type);
\ No newline at end of file
diff --git a/pisshoff-timescaledb-exporter/src/config.rs b/pisshoff-timescaledb-exporter/src/config.rs
new file mode 100644
index 0000000..8cb9e34
--- /dev/null
+++ b/pisshoff-timescaledb-exporter/src/config.rs
@@ -0,0 +1,39 @@
use clap::Parser;
use serde::{de::DeserializeOwned, Deserialize};
use std::{io::ErrorKind, path::PathBuf, sync::Arc};

/// Parser for command line arguments
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
pub struct Args {
    #[arg(short, long, env, value_parser = load_config::<Config>)]
    pub config: Arc<Config>,
    #[arg(short, long, action = clap::ArgAction::Count)]
    pub verbose: u8,
}

impl Args {
    pub fn verbosity(&self) -> &'static str {
        match self.verbose {
            0 => "info",
            1 => "debug,thrussh=info",
            2 => "debug",
            _ => "trace",
        }
    }
}

#[derive(Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct Config {
    pub socket_path: PathBuf,
    pub pg: deadpool_postgres::Config,
}

fn load_config<T: DeserializeOwned>(path: &str) -> Result<Arc<T>, std::io::Error> {
    let file = std::fs::read_to_string(path)?;

    toml::from_str(&file)
        .map(Arc::new)
        .map_err(|e| std::io::Error::new(ErrorKind::Other, e))
}
diff --git a/pisshoff-timescaledb-exporter/src/main.rs b/pisshoff-timescaledb-exporter/src/main.rs
new file mode 100644
index 0000000..9cce26a
--- /dev/null
+++ b/pisshoff-timescaledb-exporter/src/main.rs
@@ -0,0 +1,156 @@
#![deny(clippy::pedantic)]
#![allow(clippy::module_name_repetitions)]

use crate::config::Args;
use clap::Parser;
use deadpool_postgres::{
    tokio_postgres::{NoTls, Statement, Transaction},
    GenericClient, Runtime,
};
use futures::{StreamExt, TryFutureExt};
use pisshoff_types::audit::{AuditLog, AuditLogEvent};
use std::sync::Arc;
use tokio::net::{UnixListener, UnixStream};
use tokio_util::codec::{Decoder, LinesCodec};
use tracing::{error, info};
use tracing_subscriber::EnvFilter;

mod config;

mod embedded {
    use refinery::embed_migrations;
    embed_migrations!();
}

pub struct Context {
    db: deadpool_postgres::Pool,
}

#[tokio::main]
async fn main() {
    if let Err(e) = run().await {
        error!("Failed to run {}: {}", env!("CARGO_CRATE_NAME"), e);
        std::process::exit(1);
    }
}

async fn run() -> anyhow::Result<()> {
    let args = Args::parse();

    std::env::set_var("RUST_LOG", args.verbosity());

    tracing_subscriber::fmt()
        .with_env_filter(EnvFilter::from_default_env())
        .init();

    let db = args.config.pg.create_pool(Some(Runtime::Tokio1), NoTls)?;
    let context = Arc::new(Context { db });

    embedded::migrations::runner()
        .run_async(&mut **context.db.get().await?)
        .await?;

    spawn_listener(&args, context).await
}

async fn spawn_listener(args: &Args, context: Arc<Context>) -> anyhow::Result<()> {
    let listener = UnixListener::bind(&args.config.socket_path)?;

    loop {
        let (stream, remote) = listener.accept().await?;

        info!(?remote, "Accepted incoming connection");

        let context = context.clone();

        tokio::spawn(async move {
            if let Err(e) = handle_connection(stream, context).await {
                error!("Connection failed: {e}");
            }
        });
    }
}

async fn handle_connection(stream: UnixStream, context: Arc<Context>) -> anyhow::Result<()> {
    let mut framed = LinesCodec::new().framed(stream);

    while let Some(line) = framed.next().await.transpose()? {
        let context = context.clone();

        tokio::spawn(
            ingest_log(context, line).inspect_err(|e| error!("Failed to ingest log: {e}")),
        );
    }

    Ok(())
}

async fn ingest_log(context: Arc<Context>, line: String) -> anyhow::Result<()> {
    let line: AuditLog = serde_json::from_str(&line)?;

    let Some(peer_address) = line.peer_address else {
        return Ok(());
    };

    let mut connection = context.db.get().await?;
    let tx = connection.transaction().await?;

    tokio::try_join!(
        async {
            tx
                .execute(
                    "INSERT INTO audit (timestamp, connection_id, peer_address, host) VALUES ($1, $2, $3, $4)",
                    &[&line.ts, &line.connection_id, &peer_address.to_string(), &line.host],
                )
                .await
                .map_err(anyhow::Error::from)
        },
        async {
            let prepared = tx.prepare("INSERT INTO audit_environment_variables (connection_id, name, value) VALUES ($1, $2, $3)").await?;

            futures::future::try_join_all(
                line.environment_variables
                    .iter()
                    .map(|(key, value)| async { tx.execute(&prepared, &[key, value]).await }),
            )
            .await
            .map_err(anyhow::Error::from)
        },
        async {
            let prepared = tx.prepare("INSERT INTO audit_events (timestamp, connection_id, type, content) VALUES ($1, $2, $3, $4)").await?;

            futures::future::try_join_all(
                line.events
                    .iter()
                    .map(|event| insert_event(&tx, &prepared, &line, event)),
            )
            .await
        }
    )?;

    tx.commit().await?;

    Ok(())
}

async fn insert_event(
    tx: &Transaction<'_>,
    prepared: &Statement,
    line: &AuditLog,
    event: &AuditLogEvent,
) -> anyhow::Result<()> {
    let ts = line.ts + event.start_offset;

    tx.execute(
        prepared,
        &[
            &ts,
            &line.connection_id,
            &<&'static str>::from(&event.action),
            &serde_json::to_value(&event.action)?,
        ],
    )
    .await?;

    Ok(())
}
diff --git a/pisshoff-types/Cargo.toml b/pisshoff-types/Cargo.toml
new file mode 100644
index 0000000..e224dc1
--- /dev/null
+++ b/pisshoff-types/Cargo.toml
@@ -0,0 +1,12 @@
[package]
name = "pisshoff-types"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
uuid = "1.3"
time = { version = "0.3", features = ["serde", "formatting", "parsing"] }
serde = { version = "1.0", features = ["derive"] }
strum = { version = "0.24", features = ["derive"] }
diff --git a/pisshoff-types/src/audit.rs b/pisshoff-types/src/audit.rs
new file mode 100644
index 0000000..392d01e
--- /dev/null
+++ b/pisshoff-types/src/audit.rs
@@ -0,0 +1,162 @@
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::{
    fmt::{Debug, Formatter},
    net::SocketAddr,
    time::{Duration, Instant},
};
use strum::IntoStaticStr;
use time::OffsetDateTime;
use uuid::Uuid;

#[derive(Serialize, Deserialize)]
pub struct AuditLog {
    pub connection_id: Uuid,
    #[serde(with = "time::serde::rfc3339")]
    pub ts: OffsetDateTime,
    pub peer_address: Option<SocketAddr>,
    pub host: Cow<'static, str>,
    #[serde(skip_serializing_if = "Vec::is_empty", default)]
    pub environment_variables: Vec<(Box<str>, Box<str>)>,
    pub events: Vec<AuditLogEvent>,
    #[serde(skip, default = "Instant::now")]
    pub start: Instant,
}

impl Default for AuditLog {
    fn default() -> Self {
        Self {
            connection_id: Uuid::default(),
            ts: OffsetDateTime::now_utc(),
            host: Cow::Borrowed(""),
            peer_address: None,
            environment_variables: vec![],
            events: vec![],
            start: Instant::now(),
        }
    }
}

impl Debug for AuditLog {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("AuditLog")
            .field("connection_id", &self.connection_id)
            .field("peer_address", &self.peer_address)
            .field("environment_variables", &self.environment_variables)
            .field("events", &self.events)
            .finish()
    }
}

impl AuditLog {
    pub fn push_action(&mut self, action: AuditLogAction) {
        self.events.push(AuditLogEvent {
            start_offset: self.start.elapsed(),
            action,
        });
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub struct AuditLogEvent {
    pub start_offset: Duration,
    pub action: AuditLogAction,
}

#[derive(Debug, Serialize, Deserialize, IntoStaticStr)]
#[serde(tag = "type", rename_all = "kebab-case")]
#[strum(serialize_all = "kebab-case")]
pub enum AuditLogAction {
    LoginAttempt(LoginAttemptEvent),
    PtyRequest(PtyRequestEvent),
    X11Request(X11RequestEvent),
    OpenX11(OpenX11Event),
    OpenDirectTcpIp(OpenDirectTcpIpEvent),
    ExecCommand(ExecCommandEvent),
    WindowAdjusted(WindowAdjustedEvent),
    ShellRequested,
    SubsystemRequest(SubsystemRequestEvent),
    WindowChangeRequest(WindowChangeRequestEvent),
    Signal(SignalEvent),
    TcpIpForward(TcpIpForwardEvent),
    CancelTcpIpForward(TcpIpForwardEvent),
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ExecCommandEvent {
    pub args: Box<[String]>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct WindowAdjustedEvent {
    pub new_size: usize,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct SubsystemRequestEvent {
    pub name: Box<str>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct SignalEvent {
    pub name: Box<str>,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "credential-type", rename_all = "kebab-case")]
pub enum LoginAttemptEvent {
    UsernamePassword {
        username: Box<str>,
        password: Box<str>,
    },
    PublicKey {
        kind: Cow<'static, str>,
        fingerprint: Box<str>,
    },
}

#[derive(Debug, Serialize, Deserialize)]
pub struct PtyRequestEvent {
    pub term: Box<str>,
    pub col_width: u32,
    pub row_height: u32,
    pub pix_width: u32,
    pub pix_height: u32,
    pub modes: Box<[(u8, u32)]>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct OpenX11Event {
    pub originator_address: Box<str>,
    pub originator_port: u32,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct X11RequestEvent {
    pub single_connection: bool,
    pub x11_auth_protocol: Box<str>,
    pub x11_auth_cookie: Box<str>,
    pub x11_screen_number: u32,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct OpenDirectTcpIpEvent {
    pub host_to_connect: Box<str>,
    pub port_to_connect: u32,
    pub originator_address: Box<str>,
    pub originator_port: u32,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct WindowChangeRequestEvent {
    pub col_width: u32,
    pub row_height: u32,
    pub pix_width: u32,
    pub pix_height: u32,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct TcpIpForwardEvent {
    pub address: Box<str>,
    pub port: u32,
}
diff --git a/pisshoff-types/src/lib.rs b/pisshoff-types/src/lib.rs
new file mode 100644
index 0000000..1827072
--- /dev/null
+++ b/pisshoff-types/src/lib.rs
@@ -0,0 +1,4 @@
#![deny(clippy::pedantic)]
#![allow(clippy::module_name_repetitions)]

pub mod audit;
diff --git a/src/audit.rs b/src/audit.rs
deleted file mode 100644
index b340708..0000000
--- a/src/audit.rs
+++ /dev/null
@@ -1,233 +0,0 @@
use crate::config::Config;
use serde::Serialize;
use std::{
    fmt::{Debug, Formatter},
    io::ErrorKind,
    net::SocketAddr,
    sync::Arc,
    time::{Duration, Instant},
};
use time::OffsetDateTime;
use tokio::{
    fs::OpenOptions,
    io::{AsyncWriteExt, BufWriter},
    sync::{oneshot, watch},
    task::JoinHandle,
};
use tracing::{debug, info};
use uuid::Uuid;

pub fn start_audit_writer(
    config: Arc<Config>,
    mut reload: watch::Receiver<()>,
    mut shutdown_recv: oneshot::Receiver<()>,
) -> (
    tokio::sync::mpsc::UnboundedSender<AuditLog>,
    JoinHandle<Result<(), std::io::Error>>,
) {
    let (send, mut recv) = tokio::sync::mpsc::unbounded_channel();

    let handle = tokio::spawn(async move {
        let open_writer = || async {
            let file = OpenOptions::default()
                .create(true)
                .append(true)
                .open(&config.audit_output_file)
                .await?;
            Ok::<_, std::io::Error>(BufWriter::new(file))
        };

        let mut writer = open_writer().await?;
        let mut shutdown = false;

        while !shutdown {
            tokio::select! {
                log = recv.recv() => {
                    match log {
                        Some(log) => {
                            let log = serde_json::to_vec(&log)
                                .map_err(|e| std::io::Error::new(ErrorKind::Other, e))?;
                            writer.write_all(&log).await?;
                            writer.write_all("\n".as_bytes()).await?;
                        }
                        None => {
                            shutdown = true;
                        }
                    }
                }
                _ = &mut shutdown_recv => {
                    shutdown = true;
                }
                _ = tokio::time::sleep(Duration::from_secs(5)), if !writer.buffer().is_empty() => {
                    debug!("Flushing audits to disk");
                    writer.flush().await?;
                }
                Ok(()) = reload.changed() => {
                    info!("Flushing audits to disk");
                    writer.flush().await?;

                    info!("Reopening handle to log file");
                    writer = open_writer().await?;

                    info!("Successfully re-opened log file");
                }
                else => break,
            }
        }

        writer.flush().await?;

        Ok(())
    });

    (send, handle)
}

#[derive(Serialize)]
pub struct AuditLog {
    pub connection_id: Uuid,
    #[serde(with = "time::serde::rfc3339")]
    pub ts: OffsetDateTime,
    pub peer_address: Option<SocketAddr>,
    #[serde(skip_serializing_if = "Vec::is_empty")]
    pub environment_variables: Vec<(Box<str>, Box<str>)>,
    pub events: Vec<AuditLogEvent>,
    #[serde(skip)]
    pub start: Instant,
}

impl Default for AuditLog {
    fn default() -> Self {
        Self {
            connection_id: Uuid::default(),
            ts: OffsetDateTime::now_utc(),
            peer_address: None,
            environment_variables: vec![],
            events: vec![],
            start: Instant::now(),
        }
    }
}

impl Debug for AuditLog {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("AuditLog")
            .field("connection_id", &self.connection_id)
            .field("peer_address", &self.peer_address)
            .field("environment_variables", &self.environment_variables)
            .field("events", &self.events)
            .finish()
    }
}

impl AuditLog {
    pub fn push_action(&mut self, action: AuditLogAction) {
        self.events.push(AuditLogEvent {
            start_offset: self.start.elapsed(),
            action,
        });
    }
}

#[derive(Debug, Serialize)]
pub struct AuditLogEvent {
    pub start_offset: Duration,
    pub action: AuditLogAction,
}

#[derive(Debug, Serialize)]
#[serde(tag = "type", rename_all = "kebab-case")]
pub enum AuditLogAction {
    LoginAttempt(LoginAttemptEvent),
    PtyRequest(PtyRequestEvent),
    X11Request(X11RequestEvent),
    OpenX11(OpenX11Event),
    OpenDirectTcpIp(OpenDirectTcpIpEvent),
    ExecCommand(ExecCommandEvent),
    WindowAdjusted(WindowAdjustedEvent),
    ShellRequested,
    SubsystemRequest(SubsystemRequestEvent),
    WindowChangeRequest(WindowChangeRequestEvent),
    Signal(SignalEvent),
    TcpIpForward(TcpIpForwardEvent),
    CancelTcpIpForward(TcpIpForwardEvent),
}

#[derive(Debug, Serialize)]
pub struct ExecCommandEvent {
    pub args: Box<[String]>,
}

#[derive(Debug, Serialize)]
pub struct WindowAdjustedEvent {
    pub new_size: usize,
}

#[derive(Debug, Serialize)]
pub struct SubsystemRequestEvent {
    pub name: Box<str>,
}

#[derive(Debug, Serialize)]
pub struct SignalEvent {
    pub name: Box<str>,
}

#[derive(Debug, Serialize)]
#[serde(tag = "credential-type", rename_all = "kebab-case")]
pub enum LoginAttemptEvent {
    UsernamePassword {
        username: Box<str>,
        password: Box<str>,
    },
    PublicKey {
        kind: &'static str,
        fingerprint: Box<str>,
    },
}

#[derive(Debug, Serialize)]
pub struct PtyRequestEvent {
    pub term: Box<str>,
    pub col_width: u32,
    pub row_height: u32,
    pub pix_width: u32,
    pub pix_height: u32,
    pub modes: Box<[(u8, u32)]>,
}

#[derive(Debug, Serialize)]
pub struct OpenX11Event {
    pub originator_address: Box<str>,
    pub originator_port: u32,
}

#[derive(Debug, Serialize)]
pub struct X11RequestEvent {
    pub single_connection: bool,
    pub x11_auth_protocol: Box<str>,
    pub x11_auth_cookie: Box<str>,
    pub x11_screen_number: u32,
}

#[derive(Debug, Serialize)]
pub struct OpenDirectTcpIpEvent {
    pub host_to_connect: Box<str>,
    pub port_to_connect: u32,
    pub originator_address: Box<str>,
    pub originator_port: u32,
}

#[derive(Debug, Serialize)]
pub struct WindowChangeRequestEvent {
    pub col_width: u32,
    pub row_height: u32,
    pub pix_width: u32,
    pub pix_height: u32,
}

#[derive(Debug, Serialize)]
pub struct TcpIpForwardEvent {
    pub address: Box<str>,
    pub port: u32,
}
diff --git a/src/command.rs b/src/command.rs
deleted file mode 100644
index 7929f7a..0000000
--- a/src/command.rs
+++ /dev/null
@@ -1,51 +0,0 @@
use itertools::Itertools;
use std::{f32, str::FromStr, time::Duration};
use thrussh::{server::Session, ChannelId};

pub async fn run_command(args: &[String], channel: ChannelId, session: &mut Session) {
    let Some(command) = args.get(0) else {
        return;
    };

    match command.as_str() {
        "echo" => {
            session.data(
                channel,
                format!("{}\n", args.iter().skip(1).join(" ")).into(),
            );
        }
        "whoami" => {
            // TODO: grab "logged in" user
            session.data(channel, "root\n".to_string().into());
        }
        "pwd" => {
            // TODO: mock FHS
            session.data(channel, "/root\n".to_string().into());
        }
        "ls" => {
            // pretend /root is empty until we mock the FHS
        }
        "exit" => {
            let exit_status = args
                .get(1)
                .map(String::as_str)
                .map_or(Ok(0), u32::from_str)
                .unwrap_or(2);

            session.exit_status_request(channel, exit_status);
            session.close(channel);
        }
        "sleep" => {
            if let Some(Ok(secs)) = args.get(1).map(String::as_str).map(f32::from_str) {
                tokio::time::sleep(Duration::from_secs_f32(secs)).await;
            }
        }
        other => {
            // TODO: fix stderr displaying out of order
            session.data(
                channel,
                format!("bash: {other}: command not found\n").into(),
            );
        }
    }
}
diff --git a/src/config.rs b/src/config.rs
deleted file mode 100644
index 753d8eb..0000000
--- a/src/config.rs
+++ /dev/null
@@ -1,72 +0,0 @@
use clap::Parser;
use serde::{de::DeserializeOwned, Deserialize};
use std::path::PathBuf;
use std::sync::Arc;
use std::{io::ErrorKind, net::SocketAddr};

/// Parser for command line arguments, these arguments can also be passed via capitalised env vars
/// of the same name.
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
pub struct Args {
    #[arg(short, long, env, value_parser = load_config::<Config>)]
    pub config: Arc<Config>,
    #[arg(short, long, action = clap::ArgAction::Count)]
    pub verbose: u8,
}

impl Args {
    pub fn verbosity(&self) -> &'static str {
        match self.verbose {
            0 => "info",
            1 => "debug,thrussh=info",
            2 => "debug",
            _ => "trace",
        }
    }
}

#[derive(Deserialize, Clone)]
#[serde(rename_all = "kebab-case")]
pub struct Config {
    /// Address for the server to listen on.
    #[serde(default = "Config::default_listen_address")]
    pub listen_address: SocketAddr,
    /// The probability that an authentication attempt will succeed, once a given password
    /// has been accepted once - it will be accepted for the rest of the lifetime of the
    /// instance.
    #[serde(default = "Config::default_access_probability")]
    pub access_probability: f64,
    /// Path of the file to write audit logs to.
    #[serde(default = "Config::default_audit_output_file")]
    pub audit_output_file: PathBuf,
    /// The server ID string sent at the beginning of the SSH connection.
    #[serde(default = "Config::default_server_id")]
    pub server_id: String,
}

impl Config {
    fn default_listen_address() -> SocketAddr {
        "0.0.0.0:22".parse().unwrap()
    }

    fn default_access_probability() -> f64 {
        0.2
    }

    fn default_audit_output_file() -> PathBuf {
        "/var/log/pisshoff/audit.log".parse().unwrap()
    }

    fn default_server_id() -> String {
        "SSH-2.0-OpenSSH_9.3".to_string()
    }
}

fn load_config<T: DeserializeOwned>(path: &str) -> Result<Arc<T>, std::io::Error> {
    let file = std::fs::read_to_string(path)?;

    toml::from_str(&file)
        .map(Arc::new)
        .map_err(|e| std::io::Error::new(ErrorKind::Other, e))
}
diff --git a/src/main.rs b/src/main.rs
deleted file mode 100644
index d53ac86..0000000
--- a/src/main.rs
+++ /dev/null
@@ -1,103 +0,0 @@
#![deny(clippy::pedantic)]
#![allow(clippy::module_name_repetitions)]

use crate::{config::Args, server::Server};
use clap::Parser;
use futures::FutureExt;
use std::sync::Arc;
use thrussh::MethodSet;
use tokio::{
    signal::unix::SignalKind,
    sync::{oneshot, watch},
};
use tracing::{error, info};
use tracing_subscriber::EnvFilter;

mod audit;
mod command;
mod config;
mod server;
mod state;

#[tokio::main]
async fn main() {
    if let Err(e) = run().await {
        error!("Failed to run {}: {}", env!("CARGO_CRATE_NAME"), e);
        std::process::exit(1);
    }
}

async fn run() -> anyhow::Result<()> {
    let args = Args::parse();

    std::env::set_var("RUST_LOG", args.verbosity());

    tracing_subscriber::fmt()
        .with_env_filter(EnvFilter::from_default_env())
        .init();

    info!(
        "{} listening on {}",
        env!("CARGO_CRATE_NAME"),
        args.config.listen_address
    );

    let keys = vec![thrussh_keys::key::KeyPair::generate_ed25519().unwrap()];

    let thrussh_config = Arc::new(thrussh::server::Config {
        server_id: args.config.server_id.to_string(),
        methods: MethodSet::PASSWORD | MethodSet::PUBLICKEY | MethodSet::KEYBOARD_INTERACTIVE,
        keys,
        auth_rejection_time: std::time::Duration::from_secs(1),
        ..thrussh::server::Config::default()
    });

    let (reload_send, reload_recv) = watch::channel(());
    let (shutdown_send, shutdown_recv) = oneshot::channel();

    let (audit_send, audit_handle) =
        audit::start_audit_writer(args.config.clone(), reload_recv, shutdown_recv);
    let mut audit_handle = audit_handle.fuse();

    let server = Server::new(args.config.clone(), audit_send);
    let listen_address = args.config.listen_address.to_string();

    // TODO: needs clean shutdowns on clients
    let fut = thrussh::server::run(thrussh_config, &listen_address, server);

    let shutdown_watcher = watch_for_shutdown(shutdown_send);
    let reload_watcher = watch_for_reloads(reload_send);

    tokio::select! {
        res = fut => res?,
        res = &mut audit_handle => res??,
        res = shutdown_watcher => res?,
        res = reload_watcher => res?,
    }

    info!("Finishing audit log writes");
    audit_handle.await??;
    info!("Audit log writes finished");

    Ok(())
}

async fn watch_for_shutdown(send: oneshot::Sender<()>) -> Result<(), anyhow::Error> {
    tokio::signal::ctrl_c().await?;
    info!("Received ctrl-c, initiating shutdown");

    let _res = send.send(());

    Ok(())
}

async fn watch_for_reloads(send: watch::Sender<()>) -> Result<(), anyhow::Error> {
    let mut signal = tokio::signal::unix::signal(SignalKind::hangup())?;

    while let Some(()) = signal.recv().await {
        info!("Received SIGHUP, broadcasting reload");
        let _res = send.send(());
    }

    Ok(())
}
diff --git a/src/server.rs b/src/server.rs
deleted file mode 100644
index f3437a6..0000000
--- a/src/server.rs
+++ /dev/null
@@ -1,613 +0,0 @@
use crate::audit::{
    ExecCommandEvent, SignalEvent, SubsystemRequestEvent, TcpIpForwardEvent, WindowAdjustedEvent,
    WindowChangeRequestEvent,
};
use crate::{
    audit::{
        AuditLog, AuditLogAction, LoginAttemptEvent, OpenDirectTcpIpEvent, OpenX11Event,
        PtyRequestEvent, X11RequestEvent,
    },
    command::run_command,
    config::Config,
    state::State,
};
use futures::{
    future::{BoxFuture, InspectErr},
    FutureExt, TryFutureExt,
};
use std::{
    borrow::Cow,
    future::Future,
    net::SocketAddr,
    pin::Pin,
    sync::Arc,
    task::{Context, Poll},
};
use thrussh::{
    server::{Auth, Response, Session},
    ChannelId, Pty, Sig,
};
use thrussh_keys::key::PublicKey;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, error, info, info_span, instrument::Instrumented, Instrument, Span};

pub static KEYBOARD_INTERACTIVE_PROMPT: &[(Cow<'static, str>, bool)] =
    &[(Cow::Borrowed("Password: "), false)];
pub const SHELL_PROMPT: &str = "bash-5.1$ ";

#[derive(Clone)]
pub struct Server {
    config: Arc<Config>,
    state: Arc<State>,
    audit_send: UnboundedSender<AuditLog>,
}

impl Server {
    pub fn new(config: Arc<Config>, audit_send: UnboundedSender<AuditLog>) -> Self {
        Self {
            config,
            state: Arc::new(State::default()),
            audit_send,
        }
    }
}

impl thrussh::server::Server for Server {
    type Handler = Connection;

    fn new(&mut self, peer_addr: Option<SocketAddr>) -> Self::Handler {
        let connection_id = uuid::Uuid::new_v4();

        Connection {
            span: info_span!("connection", ?peer_addr, %connection_id),
            server: self.clone(),
            audit_log: AuditLog {
                connection_id,
                peer_address: peer_addr,
                ..AuditLog::default()
            },
        }
    }
}

pub struct Connection {
    span: Span,
    server: Server,
    audit_log: AuditLog,
}

impl Connection {
    fn try_login(&mut self, user: &str, password: &str) -> bool {
        let res = if self
            .server
            .state
            .previously_accepted_passwords
            .seen(password)
        {
            info!(user, password, "Accepted login due to it being used before");
            true
        } else if fastrand::f64() <= self.server.config.access_probability {
            info!(user, password, "Accepted login randomly");
            self.server
                .state
                .previously_accepted_passwords
                .store(password);
            true
        } else {
            info!(?user, ?password, "Rejected login");
            false
        };

        self.audit_log.push_action(AuditLogAction::LoginAttempt(
            LoginAttemptEvent::UsernamePassword {
                username: Box::from(user),
                password: Box::from(password),
            },
        ));

        res
    }
}

impl thrussh::server::Handler for Connection {
    type Error = anyhow::Error;
    type FutureAuth = HandlerFuture<Auth>;
    type FutureUnit = HandlerFuture<Session>;
    type FutureBool =
        ServerFuture<Self::Error, BoxFuture<'static, Result<(Self, Session, bool), Self::Error>>>;

    fn finished_auth(self, auth: Auth) -> Self::FutureAuth {
        let span = info_span!(parent: &self.span, "finished_auth");
        futures::future::ok((self, auth)).boxed().wrap(span)
    }

    fn finished_bool(self, b: bool, session: Session) -> Self::FutureBool {
        let span = info_span!(parent: &self.span, "finished_bool");
        let _entered = span.enter();

        futures::future::ok((self, session, b))
            .boxed()
            .wrap(Span::current())
    }

    fn finished(self, session: Session) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "finished");
        let _entered = span.enter();

        futures::future::ok((self, session))
            .boxed()
            .wrap(Span::current())
    }

    fn auth_none(self, _user: &str) -> Self::FutureAuth {
        let span = info_span!(parent: &self.span, "auth_none");

        self.finished_auth(Auth::UnsupportedMethod)
            .boxed()
            .wrap(span)
    }

    fn auth_password(mut self, user: &str, password: &str) -> Self::FutureAuth {
        let span = info_span!(parent: &self.span, "auth_password");
        let _entered = span.enter();

        let res = if self.try_login(user, password) {
            Auth::Accept
        } else {
            Auth::Reject
        };

        self.finished_auth(res)
    }

    fn auth_publickey(mut self, _user: &str, public_key: &PublicKey) -> Self::FutureAuth {
        let span = info_span!(parent: &self.span, "auth_publickey");
        let _entered = span.enter();

        let kind = public_key.name();
        let fingerprint = public_key.fingerprint();

        self.audit_log
            .push_action(AuditLogAction::LoginAttempt(LoginAttemptEvent::PublicKey {
                kind,
                fingerprint: Box::from(fingerprint),
            }));

        self.finished_auth(Auth::Reject)
            .boxed()
            .wrap(Span::current())
    }

    fn auth_keyboard_interactive(
        mut self,
        user: &str,
        _submethods: &str,
        mut response: Option<Response>,
    ) -> Self::FutureAuth {
        let span = info_span!(parent: &self.span, "auth_keyboard_interactive");
        let _entered = span.enter();

        let result = if let Some(password) = response
            .as_mut()
            .and_then(Response::next)
            .map(String::from_utf8_lossy)
        {
            if self.try_login(user, password.as_ref()) {
                Auth::Accept
            } else {
                Auth::Reject
            }
        } else {
            debug!("Client is attempting keyboard-interactive, obliging");

            Auth::Partial {
                name: "".into(),
                instructions: "".into(),
                prompts: KEYBOARD_INTERACTIVE_PROMPT.into(),
            }
        };

        self.finished_auth(result)
    }

    fn channel_close(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "channel_close");
        let _entered = span.enter();

        session.channel_success(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn channel_eof(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "channel_eof");
        let _entered = span.enter();

        info!("In here");

        session.channel_success(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn channel_open_session(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "channel_open_session");
        let _entered = span.enter();

        session.channel_success(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn channel_open_x11(
        mut self,
        channel: ChannelId,
        originator_address: &str,
        originator_port: u32,
        mut session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "channel_open_x11");
        let _entered = span.enter();

        self.audit_log
            .push_action(AuditLogAction::OpenX11(OpenX11Event {
                originator_address: Box::from(originator_address),
                originator_port,
            }));

        session.channel_failure(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn channel_open_direct_tcpip(
        mut self,
        channel: ChannelId,
        host_to_connect: &str,
        port_to_connect: u32,
        originator_address: &str,
        originator_port: u32,
        mut session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "channel_open_direct_tcpip");
        let _entered = span.enter();

        self.audit_log
            .push_action(AuditLogAction::OpenDirectTcpIp(OpenDirectTcpIpEvent {
                host_to_connect: Box::from(host_to_connect),
                port_to_connect,
                originator_address: Box::from(originator_address),
                originator_port,
            }));

        session.channel_failure(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn data(mut self, channel: ChannelId, data: &[u8], mut session: Session) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "data");
        let _entered = span.enter();

        let data = shlex::split(String::from_utf8_lossy(data).as_ref());

        async move {
            if let Some(args) = data {
                run_command(&args, channel, &mut session).await;
                self.audit_log
                    .push_action(AuditLogAction::ExecCommand(ExecCommandEvent {
                        args: Box::from(args),
                    }));
            }

            session.data(channel, SHELL_PROMPT.to_string().into());
            self.finished(session).await
        }
        .boxed()
        .wrap(Span::current())
    }

    fn extended_data(
        self,
        _channel: ChannelId,
        _code: u32,
        _data: &[u8],
        session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "extended_data");
        let _entered = span.enter();

        self.finished(session).boxed().wrap(Span::current())
    }

    fn window_adjusted(
        mut self,
        _channel: ChannelId,
        new_window_size: usize,
        session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "window_adjusted");
        let _entered = span.enter();

        self.audit_log
            .push_action(AuditLogAction::WindowAdjusted(WindowAdjustedEvent {
                new_size: new_window_size,
            }));

        self.finished(session).boxed().wrap(Span::current())
    }

    fn adjust_window(&mut self, _channel: ChannelId, current: u32) -> u32 {
        let span = info_span!(parent: &self.span, "adjust_window");
        let _entered = span.enter();

        current
    }

    fn pty_request(
        mut self,
        channel: ChannelId,
        term: &str,
        col_width: u32,
        row_height: u32,
        pix_width: u32,
        pix_height: u32,
        modes: &[(Pty, u32)],
        mut session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "pty_request");
        let _entered = span.enter();

        self.audit_log
            .push_action(AuditLogAction::PtyRequest(PtyRequestEvent {
                term: Box::from(term),
                col_width,
                row_height,
                pix_width,
                pix_height,
                modes: Box::from(
                    modes
                        .iter()
                        .copied()
                        .map(|(pty, val)| (pty as u8, val))
                        .collect::<Vec<_>>(),
                ),
            }));

        session.channel_failure(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn x11_request(
        mut self,
        channel: ChannelId,
        single_connection: bool,
        x11_auth_protocol: &str,
        x11_auth_cookie: &str,
        x11_screen_number: u32,
        mut session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "x11_request");
        let _entered = span.enter();

        self.audit_log
            .push_action(AuditLogAction::X11Request(X11RequestEvent {
                single_connection,
                x11_auth_protocol: Box::from(x11_auth_protocol),
                x11_auth_cookie: Box::from(x11_auth_cookie),
                x11_screen_number,
            }));

        session.channel_failure(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn env_request(
        mut self,
        channel: ChannelId,
        variable_name: &str,
        variable_value: &str,
        mut session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "env_request");
        let _entered = span.enter();

        self.audit_log
            .environment_variables
            .push((Box::from(variable_name), Box::from(variable_value)));

        session.channel_success(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn shell_request(mut self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "shell_request");
        let _entered = span.enter();

        self.audit_log.push_action(AuditLogAction::ShellRequested);

        session.data(channel, SHELL_PROMPT.to_string().into());

        session.channel_success(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn exec_request(
        mut self,
        channel: ChannelId,
        data: &[u8],
        mut session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "exec_request");
        let _entered = span.enter();

        let data = shlex::split(String::from_utf8_lossy(data).as_ref());

        async move {
            if let Some(args) = data {
                run_command(&args, channel, &mut session).await;
                self.audit_log
                    .push_action(AuditLogAction::ExecCommand(ExecCommandEvent {
                        args: Box::from(args),
                    }));

                session.channel_success(channel);
            } else {
                session.channel_failure(channel);
            }

            self.finished(session).await
        }
        .boxed()
        .wrap(Span::current())
    }

    fn subsystem_request(
        mut self,
        channel: ChannelId,
        name: &str,
        mut session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "subsystem_request");
        let _entered = span.enter();

        self.audit_log
            .push_action(AuditLogAction::SubsystemRequest(SubsystemRequestEvent {
                name: Box::from(name),
            }));

        session.channel_failure(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn window_change_request(
        mut self,
        channel: ChannelId,
        col_width: u32,
        row_height: u32,
        pix_width: u32,
        pix_height: u32,
        mut session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "window_change_request");
        let _entered = span.enter();

        self.audit_log
            .push_action(AuditLogAction::WindowChangeRequest(
                WindowChangeRequestEvent {
                    col_width,
                    row_height,
                    pix_width,
                    pix_height,
                },
            ));

        session.channel_success(channel);
        self.finished(session).boxed().wrap(Span::current())
    }

    fn signal(
        mut self,
        _channel: ChannelId,
        signal_name: Sig,
        session: Session,
    ) -> Self::FutureUnit {
        let span = info_span!(parent: &self.span, "signal");
        let _entered = span.enter();

        self.audit_log
            .push_action(AuditLogAction::Signal(SignalEvent {
                name: format!("{signal_name:?}").into(),
            }));

        self.finished(session).boxed().wrap(Span::current())
    }

    fn tcpip_forward(mut self, address: &str, port: u32, session: Session) -> Self::FutureBool {
        let span = info_span!(parent: &self.span, "tcpip_forward");
        let _entered = span.enter();

        self.audit_log
            .push_action(AuditLogAction::TcpIpForward(TcpIpForwardEvent {
                address: Box::from(address),
                port,
            }));

        self.finished_bool(false, session)
            .boxed()
            .wrap(Span::current())
    }

    fn cancel_tcpip_forward(
        mut self,
        address: &str,
        port: u32,
        session: Session,
    ) -> Self::FutureBool {
        let span = info_span!(parent: &self.span, "cancel_tcpip_forward");
        let _entered = span.enter();

        self.audit_log
            .push_action(AuditLogAction::CancelTcpIpForward(TcpIpForwardEvent {
                address: Box::from(address),
                port,
            }));

        self.finished_bool(false, session)
            .boxed()
            .wrap(Span::current())
    }
}

impl Drop for Connection {
    fn drop(&mut self) {
        let span = info_span!(parent: &self.span, "drop");
        let _entered = span.enter();

        info!("Connection closed");

        let _res = self
            .server
            .audit_send
            .send(std::mem::take(&mut self.audit_log));
    }
}

type HandlerResult<T> = Result<T, <Connection as thrussh::server::Handler>::Error>;
type HandlerFuture<T> = ServerFuture<
    <Connection as thrussh::server::Handler>::Error,
    BoxFuture<'static, HandlerResult<(Connection, T)>>,
>;

/// Wraps a future, providing logging and instrumentation. This provides a newtype over the future
/// (`ServerFuture`) in order to enforce usage within the `thrussh::server::Handler` impl.
pub trait WrapFuture: Sized {
    type Ok;
    type Err;

    fn wrap(self, span: Span) -> ServerFuture<Self::Err, Self>;
}

impl<T, F: Future<Output = Result<T, anyhow::Error>>> WrapFuture for F {
    type Ok = T;
    type Err = anyhow::Error;

    fn wrap(self, span: Span) -> ServerFuture<Self::Err, Self> {
        ServerFuture(
            self.inspect_err(log_err as fn(&anyhow::Error))
                .instrument(span),
        )
    }
}

/// Logs an error from a future result.
fn log_err(e: &anyhow::Error) {
    error!("Connection closed due to: {}", e);
}

/// A wrapped future, providing logging ad instrumentation.
#[allow(clippy::type_complexity)]
pub struct ServerFuture<E, F>(Instrumented<InspectErr<F, fn(&E)>>);

impl<T, E, F: Future<Output = Result<T, E>> + Unpin> Future for ServerFuture<E, F> {
    type Output = F::Output;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        Pin::new(&mut self.0).poll(cx)
    }
}
diff --git a/src/state.rs b/src/state.rs
deleted file mode 100644
index 0ade91a..0000000
--- a/src/state.rs
+++ /dev/null
@@ -1,22 +0,0 @@
use parking_lot::RwLock;
use std::collections::HashSet;

#[derive(Default)]
pub struct State {
    /// A list of passwords that have previously been accepted, and will forever be accepted
    /// to further attract the bear.
    pub previously_accepted_passwords: StoredPasswords,
}

#[derive(Default)]
pub struct StoredPasswords(RwLock<HashSet<Box<str>>>);

impl StoredPasswords {
    pub fn seen(&self, password: &str) -> bool {
        self.0.read().contains(password)
    }

    pub fn store(&self, password: &str) -> bool {
        self.0.write().insert(Box::from(password.to_string()))
    }
}