From b947760e2257ee96b9e150b4666be386f3628da9 Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Mon, 26 Jun 2023 01:05:18 +0100 Subject: [PATCH] Add timescaledb exporter application --- Cargo.lock | 441 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------- Cargo.toml | 31 ++++++------------------------- config.toml | 10 ---------- flake.nix | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------- pisshoff-server/Cargo.toml | 27 +++++++++++++++++++++++++++ pisshoff-server/config.toml | 10 ++++++++++ pisshoff-server/src/audit.rs | 76 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ pisshoff-server/src/command.rs | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ pisshoff-server/src/config.rs | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ pisshoff-server/src/main.rs | 110 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ pisshoff-server/src/server.rs | 620 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ pisshoff-server/src/state.rs | 22 ++++++++++++++++++++++ pisshoff-timescaledb-exporter/Cargo.toml | 23 +++++++++++++++++++++++ pisshoff-timescaledb-exporter/config.toml | 7 +++++++ pisshoff-timescaledb-exporter/migrations/V1__initial.sql | 30 ++++++++++++++++++++++++++++++ pisshoff-timescaledb-exporter/src/config.rs | 39 +++++++++++++++++++++++++++++++++++++++ pisshoff-timescaledb-exporter/src/main.rs | 156 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ pisshoff-types/Cargo.toml | 12 ++++++++++++ pisshoff-types/src/audit.rs | 162 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ pisshoff-types/src/lib.rs | 4 ++++ src/audit.rs | 233 ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- src/command.rs | 51 --------------------------------------------------- src/config.rs | 72 ------------------------------------------------------------------------ src/main.rs | 103 ------------------------------------------------------------------------------------------------------- src/server.rs | 613 ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- src/state.rs | 22 ---------------------- 26 files changed, 1931 insertions(+), 1150 deletions(-) delete mode 100644 config.toml create mode 100644 pisshoff-server/Cargo.toml create mode 100644 pisshoff-server/config.toml create mode 100644 pisshoff-server/src/audit.rs create mode 100644 pisshoff-server/src/command.rs create mode 100644 pisshoff-server/src/config.rs create mode 100644 pisshoff-server/src/main.rs create mode 100644 pisshoff-server/src/server.rs create mode 100644 pisshoff-server/src/state.rs create mode 100644 pisshoff-timescaledb-exporter/Cargo.toml create mode 100644 pisshoff-timescaledb-exporter/config.toml create mode 100644 pisshoff-timescaledb-exporter/migrations/V1__initial.sql create mode 100644 pisshoff-timescaledb-exporter/src/config.rs create mode 100644 pisshoff-timescaledb-exporter/src/main.rs create mode 100644 pisshoff-types/Cargo.toml create mode 100644 pisshoff-types/src/audit.rs create mode 100644 pisshoff-types/src/lib.rs delete mode 100644 src/audit.rs delete mode 100644 src/command.rs delete mode 100644 src/config.rs delete mode 100644 src/main.rs delete mode 100644 src/server.rs delete mode 100644 src/state.rs diff --git a/Cargo.lock b/Cargo.lock index bc4764f..8645ef6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -22,6 +22,15 @@ dependencies = [ ] [[package]] +name = "aho-corasick" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41" +dependencies = [ + "memchr", +] + +[[package]] name = "anstream" version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -77,12 +86,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" [[package]] +name = "async-trait" +version = "0.1.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.20", +] + +[[package]] name = "autocfg" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] +name = "base64" +version = "0.21.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d" + +[[package]] name = "base64ct" version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -232,7 +258,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 2.0.20", ] [[package]] @@ -311,6 +337,42 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" [[package]] +name = "deadpool" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "421fe0f90f2ab22016f32a9881be5134fdd71c65298917084b0c7477cbc3856e" +dependencies = [ + "async-trait", + "deadpool-runtime", + "num_cpus", + "retain_mut", + "serde", + "tokio", +] + +[[package]] +name = "deadpool-postgres" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "836a24a9d49deefe610b8b60c767a7412e9a931d79a89415cd2d2d71630ca8d7" +dependencies = [ + "deadpool", + "log", + "serde", + "tokio", + "tokio-postgres", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eaa37046cc0f6c3cc6090fbdbf73ef0b8ef4cfcc37f6befc0020f63e8cf121e1" +dependencies = [ + "tokio", +] + +[[package]] name = "digest" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -384,6 +446,12 @@ dependencies = [ ] [[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + +[[package]] name = "fastrand" version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -403,6 +471,15 @@ dependencies = [ ] [[package]] +name = "form_urlencoded" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652" +dependencies = [ + "percent-encoding", +] + +[[package]] name = "futures" version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -458,7 +535,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.20", ] [[package]] @@ -550,6 +627,25 @@ dependencies = [ ] [[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest 0.10.7", +] + +[[package]] +name = "idna" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + +[[package]] name = "indexmap" version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -671,6 +767,15 @@ dependencies = [ ] [[package]] +name = "md-5" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca" +dependencies = [ + "digest 0.10.7", +] + +[[package]] name = "md5" version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -683,6 +788,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" [[package]] +name = "memoffset" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +dependencies = [ + "autocfg", +] + +[[package]] name = "miniz_oxide" version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -703,6 +817,20 @@ dependencies = [ ] [[package]] +name = "nix" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a" +dependencies = [ + "bitflags", + "cfg-if", + "libc", + "memoffset", + "pin-utils", + "static_assertions", +] + +[[package]] name = "nu-ansi-term" version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -812,7 +940,7 @@ checksum = "d95f5254224e617595d2cc3cc73ff0a5eaf2637519e25f03388154e9378b6ffa" dependencies = [ "base64ct", "crypto-mac", - "hmac", + "hmac 0.11.0", "password-hash", "sha2 0.9.9", ] @@ -827,6 +955,30 @@ dependencies = [ ] [[package]] +name = "percent-encoding" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" + +[[package]] +name = "phf" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_shared" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" +dependencies = [ + "siphasher", +] + +[[package]] name = "pin-project-lite" version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -839,7 +991,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] -name = "pisshoff" +name = "pisshoff-server" version = "0.1.0" dependencies = [ "anyhow", @@ -847,13 +999,14 @@ dependencies = [ "fastrand", "futures", "itertools", + "nix", "parking_lot", + "pisshoff-types", "serde", "serde_json", "shlex", "thrussh", "thrussh-keys", - "time", "tokio", "toml", "tracing", @@ -862,12 +1015,75 @@ dependencies = [ ] [[package]] +name = "pisshoff-timescaledb-exporter" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "deadpool-postgres", + "futures", + "pisshoff-types", + "refinery", + "serde", + "serde_json", + "tokio", + "tokio-postgres", + "tokio-util", + "toml", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "pisshoff-types" +version = "0.1.0" +dependencies = [ + "serde", + "strum", + "time", + "uuid", +] + +[[package]] name = "pkg-config" version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" [[package]] +name = "postgres-protocol" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78b7fa9f396f51dffd61546fd8573ee20592287996568e6175ceb0f8699ad75d" +dependencies = [ + "base64", + "byteorder", + "bytes", + "fallible-iterator", + "hmac 0.12.1", + "md-5", + "memchr", + "rand", + "sha2 0.10.7", + "stringprep", +] + +[[package]] +name = "postgres-types" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f028f05971fe20f512bcc679e2c10227e57809a3af86a7606304435bc8896cd6" +dependencies = [ + "bytes", + "fallible-iterator", + "postgres-protocol", + "serde", + "serde_json", + "time", + "uuid", +] + +[[package]] name = "ppv-lite86" version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -951,11 +1167,58 @@ dependencies = [ ] [[package]] +name = "refinery" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdb0436d0dd7bd8d4fce1e828751fa79742b08e35f27cfea7546f8a322b5ef24" +dependencies = [ + "refinery-core", + "refinery-macros", +] + +[[package]] +name = "refinery-core" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19206547cd047e8f4dfa6b20c30d3ecaf24be05841b6aa0aa926a47a3d0662bb" +dependencies = [ + "async-trait", + "cfg-if", + "lazy_static", + "log", + "regex", + "serde", + "siphasher", + "thiserror", + "time", + "tokio", + "tokio-postgres", + "toml", + "url", + "walkdir", +] + +[[package]] +name = "refinery-macros" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d94d4b9241859ba19eaa5c04c86e782eb3aa0aae2c5868e0cfa90c856e58a174" +dependencies = [ + "proc-macro2", + "quote", + "refinery-core", + "regex", + "syn 2.0.20", +] + +[[package]] name = "regex" version = "1.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0ab3ca65655bb1e41f2a8c8cd662eb4fb035e67c3f78da1d61dffe89d07300f" dependencies = [ + "aho-corasick", + "memchr", "regex-syntax 0.7.2", ] @@ -981,6 +1244,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78" [[package]] +name = "retain_mut" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0" + +[[package]] name = "rustix" version = "0.37.20" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -995,6 +1264,12 @@ dependencies = [ ] [[package]] +name = "rustversion" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f3208ce4d8448b3f3e7d168a73f5e0c43a61e32930de3bceeccedb388b6bf06" + +[[package]] name = "ryu" version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1032,7 +1307,7 @@ checksum = "d9735b638ccc51c28bf6914d90a2e9725b377144fc612c49a611fddd1b631d68" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.20", ] [[package]] @@ -1104,6 +1379,12 @@ dependencies = [ ] [[package]] +name = "siphasher" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" + +[[package]] name = "slab" version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1129,12 +1410,60 @@ dependencies = [ ] [[package]] +name = "socket2" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + +[[package]] +name = "stringprep" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + +[[package]] name = "strsim" version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] +name = "strum" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.24.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn 1.0.109", +] + +[[package]] name = "subtle" version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1142,6 +1471,17 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "syn" version = "2.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fcb8d4cebc40aa517dfb69618fa647a346562e67228e2236ae0042ee6ac14775" @@ -1168,7 +1508,7 @@ checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.20", ] [[package]] @@ -1216,7 +1556,7 @@ dependencies = [ "data-encoding", "dirs", "futures", - "hmac", + "hmac 0.11.0", "log", "md5", "num-bigint", @@ -1274,6 +1614,21 @@ dependencies = [ ] [[package]] +name = "tinyvec" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + +[[package]] name = "tokio" version = "1.28.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1287,7 +1642,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.4.9", "tokio-macros", "windows-sys", ] @@ -1300,7 +1655,31 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.20", +] + +[[package]] +name = "tokio-postgres" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e89f6234aa8fd43779746012fcf53603cdb91fdd8399aa0de868c2d56b6dde1" +dependencies = [ + "async-trait", + "byteorder", + "bytes", + "fallible-iterator", + "futures-channel", + "futures-util", + "log", + "parking_lot", + "percent-encoding", + "phf", + "pin-project-lite", + "postgres-protocol", + "postgres-types", + "socket2 0.5.3", + "tokio", + "tokio-util", ] [[package]] @@ -1315,6 +1694,20 @@ dependencies = [ ] [[package]] +name = "tokio-util" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", + "tracing", +] + +[[package]] name = "toml" version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1368,7 +1761,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.20", ] [[package]] @@ -1417,12 +1810,38 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" [[package]] +name = "unicode-bidi" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" + +[[package]] name = "unicode-ident" version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15811caf2415fb889178633e7724bad2509101cde276048e013b9def5e51fa0" [[package]] +name = "unicode-normalization" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921" +dependencies = [ + "tinyvec", +] + +[[package]] +name = "url" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50bff7831e19200a85b17131d085c25d7811bc4e186efdaf54bbd132994a88cb" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + +[[package]] name = "utf8parse" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/Cargo.toml b/Cargo.toml index b666354..bd808b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,28 +1,9 @@ -[package] -name = "pisshoff" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -anyhow = "1.0" -clap = { version = "4.3", features = ["derive", "env", "cargo"] } -futures = "0.3" -parking_lot = "0.12" -fastrand = "1.9" -itertools = "0.10" -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -shlex = "1.1" -thrussh = "0.34" -thrussh-keys = "0.22" -time = { version = "0.3", features = ["serde", "formatting"] } -tokio = { version = "1.28", features = ["full"] } -toml = "0.7" -tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } -uuid = { version = "1.3", features = ["v4", "serde"] } +[workspace] +members = [ + "pisshoff-server", + "pisshoff-timescaledb-exporter", + "pisshoff-types" +] [patch."crates-io"] thrussh = { git = "ssh://git@github.com/JordanForks/thrussh" } diff --git a/config.toml b/config.toml deleted file mode 100644 index b8c834e..0000000 --- a/config.toml +++ /dev/null @@ -1,10 +0,0 @@ -# Address for the server to listen on. -listen-address = "127.0.0.1:2233" - -# The probability that an authentication attempt will succeed, once a given password -# has been accepted once - it will be accepted for the rest of the lifetime of the -# instance. -access-probability = 0.2 - -# Path of the file to write audit logs to. -audit-output-file = "audit.jsonl" diff --git a/flake.nix b/flake.nix index 4d35e92..7cb88b1 100644 --- a/flake.nix +++ b/flake.nix @@ -14,33 +14,34 @@ { formatter = nixpkgs.legacyPackages."${system}".nixpkgs-fmt; - defaultPackage = naersk-lib.buildPackage { + packages.default = naersk-lib.buildPackage { src = ./.; nativeBuildInputs = with pkgs; [ pkg-config ]; buildInputs = with pkgs; [ libsodium ]; }; - devShell = with pkgs; mkShell { + + devShells.default = with pkgs; mkShell { buildInputs = [ cargo rustc rustfmt pre-commit rustPackages.clippy ]; RUST_SRC_PATH = rustPlatform.rustLibSrc; }; - nixosModules.default = { config, lib, pkgs, ... }: + nixosModules.pisshoff-server = { config, lib, pkgs, ... }: with lib; let - cfg = config.services.pisshoff; + cfg = config.services.pisshoff-server; in { - options.services.pisshoff = { - enable = mkEnableOption "pisshoff"; + options.services.pisshoff-server = { + enable = mkEnableOption "pisshoff-server"; settings = mkOption { type = (pkgs.formats.toml { }).type; default = { }; - description = "Specify the configuration for pisshoff in Nix"; + description = "Specify the configuration for pisshoff-server in Nix"; }; }; config = mkIf cfg.enable { - systemd.services.pisshoff = { + systemd.services.pisshoff-server = { enable = true; wantedBy = [ "multi-user.target" ]; after = [ "network-online.target" ]; @@ -51,7 +52,7 @@ in { Type = "exec"; - ExecStart = "${self.defaultPackage."${system}"}/bin/pisshoff -c \"${conf}\""; + ExecStart = "${self.packages."${system}".default}/bin/pisshoff-server -c \"${conf}\""; ExecReload = "${pkgs.coreutils}/bin/kill -HUP $MAINPID"; Restart = "on-failure"; @@ -84,7 +85,7 @@ }; }; - services.logrotate.settings.pisshoff = { + services.logrotate.settings.pisshoff-server = { files = "/var/log/pisshoff/audit.log"; rotate = 31; frequency = "daily"; @@ -96,5 +97,68 @@ }; }; }; + + nixosModules.pisshoff-timescaledb-exporter = { config, lib, pkgs, ... }: + with lib; + let + cfg = config.services.pisshoff-timescaledb-exporter; + in + { + options.services.pisshoff-timescaledb-exporter = { + enable = mkEnableOption "pisshoff-timescaledb-exporter"; + settings = mkOption { + type = (pkgs.formats.toml { }).type; + default = { }; + description = "Specify the configuration for pisshoff-timescaledb-exporter in Nix"; + }; + }; + + config = mkIf cfg.enable { + systemd.services.pisshoff-timescaledb-exporter = { + enable = true; + wantedBy = [ "multi-user.target" ]; + after = [ "network-online.target" ]; + serviceConfig = + let + format = pkgs.formats.toml { }; + conf = format.generate "pisshoff.toml" cfg.settings; + in + { + Type = "exec"; + ExecStart = "${self.packages."${system}".default}/bin/pisshoff-timescaledb-exporter -c \"${conf}\""; + ExecReload = "${pkgs.coreutils}/bin/kill -HUP $MAINPID"; + Restart = "on-failure"; + + RuntimeDirectory = "pisshoff-timescaledb-exporter"; + LogsDirectory = "pisshoff-timescaledb-exporter"; + CapabilityBoundingSet = ""; + NoNewPrivileges = true; + PrivateDevices = true; + PrivateTmp = true; + PrivateUsers = true; + PrivateMounts = true; + ProtectHome = true; + ProtectClock = true; + ProtectProc = "invisible"; + ProcSubset = "pid"; + ProtectKernelLogs = true; + ProtectKernelModules = true; + ProtectKernelTunables = true; + ProtectControlGroups = true; + ProtectHostname = true; + ProtectSystem = "strict"; + RestrictSUIDSGID = true; + RestrictRealtime = true; + RestrictNamespaces = true; + LockPersonality = true; + RemoveIPC = true; + MemoryDenyWriteExecute = true; + DynamicUser = true; + RestrictAddressFamilies = [ "AF_UNIX" "AF_INET" "AF_INET6" ]; + SystemCallFilter = [ "@system-service" "~@privileged" ]; + }; + }; + }; + }; }); } diff --git a/pisshoff-server/Cargo.toml b/pisshoff-server/Cargo.toml new file mode 100644 index 0000000..11445b6 --- /dev/null +++ b/pisshoff-server/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "pisshoff-server" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +pisshoff-types = { path = "../pisshoff-types" } + +anyhow = "1.0" +clap = { version = "4.3", features = ["derive", "env", "cargo"] } +futures = "0.3" +parking_lot = "0.12" +fastrand = "1.9" +itertools = "0.10" +nix = { version = "0.26", features = ["hostname"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +shlex = "1.1" +thrussh = "0.34" +thrussh-keys = "0.22" +tokio = { version = "1.28", features = ["full"] } +toml = "0.7" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +uuid = { version = "1.3", features = ["v4", "serde"] } diff --git a/pisshoff-server/config.toml b/pisshoff-server/config.toml new file mode 100644 index 0000000..b8c834e --- /dev/null +++ b/pisshoff-server/config.toml @@ -0,0 +1,10 @@ +# Address for the server to listen on. +listen-address = "127.0.0.1:2233" + +# The probability that an authentication attempt will succeed, once a given password +# has been accepted once - it will be accepted for the rest of the lifetime of the +# instance. +access-probability = 0.2 + +# Path of the file to write audit logs to. +audit-output-file = "audit.jsonl" diff --git a/pisshoff-server/src/audit.rs b/pisshoff-server/src/audit.rs new file mode 100644 index 0000000..7ce2055 --- /dev/null +++ b/pisshoff-server/src/audit.rs @@ -0,0 +1,76 @@ +use crate::config::Config; +pub use pisshoff_types::audit::*; +use std::{io::ErrorKind, sync::Arc, time::Duration}; +use tokio::{ + fs::OpenOptions, + io::{AsyncWriteExt, BufWriter}, + sync::{oneshot, watch}, + task::JoinHandle, +}; +use tracing::{debug, info}; + +pub fn start_audit_writer( + config: Arc, + mut reload: watch::Receiver<()>, + mut shutdown_recv: oneshot::Receiver<()>, +) -> ( + tokio::sync::mpsc::UnboundedSender, + JoinHandle>, +) { + let (send, mut recv) = tokio::sync::mpsc::unbounded_channel(); + + let handle = tokio::spawn(async move { + let open_writer = || async { + let file = OpenOptions::default() + .create(true) + .append(true) + .open(&config.audit_output_file) + .await?; + Ok::<_, std::io::Error>(BufWriter::new(file)) + }; + + let mut writer = open_writer().await?; + let mut shutdown = false; + + while !shutdown { + tokio::select! { + log = recv.recv() => { + match log { + Some(log) => { + let log = serde_json::to_vec(&log) + .map_err(|e| std::io::Error::new(ErrorKind::Other, e))?; + writer.write_all(&log).await?; + writer.write_all("\n".as_bytes()).await?; + } + None => { + shutdown = true; + } + } + } + _ = &mut shutdown_recv => { + shutdown = true; + } + _ = tokio::time::sleep(Duration::from_secs(5)), if !writer.buffer().is_empty() => { + debug!("Flushing audits to disk"); + writer.flush().await?; + } + Ok(()) = reload.changed() => { + info!("Flushing audits to disk"); + writer.flush().await?; + + info!("Reopening handle to log file"); + writer = open_writer().await?; + + info!("Successfully re-opened log file"); + } + else => break, + } + } + + writer.flush().await?; + + Ok(()) + }); + + (send, handle) +} diff --git a/pisshoff-server/src/command.rs b/pisshoff-server/src/command.rs new file mode 100644 index 0000000..7929f7a --- /dev/null +++ b/pisshoff-server/src/command.rs @@ -0,0 +1,51 @@ +use itertools::Itertools; +use std::{f32, str::FromStr, time::Duration}; +use thrussh::{server::Session, ChannelId}; + +pub async fn run_command(args: &[String], channel: ChannelId, session: &mut Session) { + let Some(command) = args.get(0) else { + return; + }; + + match command.as_str() { + "echo" => { + session.data( + channel, + format!("{}\n", args.iter().skip(1).join(" ")).into(), + ); + } + "whoami" => { + // TODO: grab "logged in" user + session.data(channel, "root\n".to_string().into()); + } + "pwd" => { + // TODO: mock FHS + session.data(channel, "/root\n".to_string().into()); + } + "ls" => { + // pretend /root is empty until we mock the FHS + } + "exit" => { + let exit_status = args + .get(1) + .map(String::as_str) + .map_or(Ok(0), u32::from_str) + .unwrap_or(2); + + session.exit_status_request(channel, exit_status); + session.close(channel); + } + "sleep" => { + if let Some(Ok(secs)) = args.get(1).map(String::as_str).map(f32::from_str) { + tokio::time::sleep(Duration::from_secs_f32(secs)).await; + } + } + other => { + // TODO: fix stderr displaying out of order + session.data( + channel, + format!("bash: {other}: command not found\n").into(), + ); + } + } +} diff --git a/pisshoff-server/src/config.rs b/pisshoff-server/src/config.rs new file mode 100644 index 0000000..753d8eb --- /dev/null +++ b/pisshoff-server/src/config.rs @@ -0,0 +1,72 @@ +use clap::Parser; +use serde::{de::DeserializeOwned, Deserialize}; +use std::path::PathBuf; +use std::sync::Arc; +use std::{io::ErrorKind, net::SocketAddr}; + +/// Parser for command line arguments, these arguments can also be passed via capitalised env vars +/// of the same name. +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +pub struct Args { + #[arg(short, long, env, value_parser = load_config::)] + pub config: Arc, + #[arg(short, long, action = clap::ArgAction::Count)] + pub verbose: u8, +} + +impl Args { + pub fn verbosity(&self) -> &'static str { + match self.verbose { + 0 => "info", + 1 => "debug,thrussh=info", + 2 => "debug", + _ => "trace", + } + } +} + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct Config { + /// Address for the server to listen on. + #[serde(default = "Config::default_listen_address")] + pub listen_address: SocketAddr, + /// The probability that an authentication attempt will succeed, once a given password + /// has been accepted once - it will be accepted for the rest of the lifetime of the + /// instance. + #[serde(default = "Config::default_access_probability")] + pub access_probability: f64, + /// Path of the file to write audit logs to. + #[serde(default = "Config::default_audit_output_file")] + pub audit_output_file: PathBuf, + /// The server ID string sent at the beginning of the SSH connection. + #[serde(default = "Config::default_server_id")] + pub server_id: String, +} + +impl Config { + fn default_listen_address() -> SocketAddr { + "0.0.0.0:22".parse().unwrap() + } + + fn default_access_probability() -> f64 { + 0.2 + } + + fn default_audit_output_file() -> PathBuf { + "/var/log/pisshoff/audit.log".parse().unwrap() + } + + fn default_server_id() -> String { + "SSH-2.0-OpenSSH_9.3".to_string() + } +} + +fn load_config(path: &str) -> Result, std::io::Error> { + let file = std::fs::read_to_string(path)?; + + toml::from_str(&file) + .map(Arc::new) + .map_err(|e| std::io::Error::new(ErrorKind::Other, e)) +} diff --git a/pisshoff-server/src/main.rs b/pisshoff-server/src/main.rs new file mode 100644 index 0000000..379a4c4 --- /dev/null +++ b/pisshoff-server/src/main.rs @@ -0,0 +1,110 @@ +#![deny(clippy::pedantic)] +#![allow(clippy::module_name_repetitions)] + +use crate::{config::Args, server::Server}; +use anyhow::anyhow; +use clap::Parser; +use futures::FutureExt; +use std::sync::Arc; +use thrussh::MethodSet; +use tokio::{ + signal::unix::SignalKind, + sync::{oneshot, watch}, +}; +use tracing::{error, info}; +use tracing_subscriber::EnvFilter; + +mod audit; +mod command; +mod config; +mod server; +mod state; + +#[tokio::main] +async fn main() { + if let Err(e) = run().await { + error!("Failed to run {}: {}", env!("CARGO_CRATE_NAME"), e); + std::process::exit(1); + } +} + +async fn run() -> anyhow::Result<()> { + let args = Args::parse(); + + std::env::set_var("RUST_LOG", args.verbosity()); + + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); + + info!( + "{} listening on {}", + env!("CARGO_CRATE_NAME"), + args.config.listen_address + ); + + let hostname = Box::leak( + nix::unistd::gethostname()? + .into_string() + .map_err(|_| anyhow!("invalid hostname"))? + .into_boxed_str(), + ); + let keys = vec![thrussh_keys::key::KeyPair::generate_ed25519().unwrap()]; + + let thrussh_config = Arc::new(thrussh::server::Config { + server_id: args.config.server_id.to_string(), + methods: MethodSet::PASSWORD | MethodSet::PUBLICKEY | MethodSet::KEYBOARD_INTERACTIVE, + keys, + auth_rejection_time: std::time::Duration::from_secs(1), + ..thrussh::server::Config::default() + }); + + let (reload_send, reload_recv) = watch::channel(()); + let (shutdown_send, shutdown_recv) = oneshot::channel(); + + let (audit_send, audit_handle) = + audit::start_audit_writer(args.config.clone(), reload_recv, shutdown_recv); + let mut audit_handle = audit_handle.fuse(); + + let server = Server::new(hostname, args.config.clone(), audit_send); + let listen_address = args.config.listen_address.to_string(); + + // TODO: needs clean shutdowns on clients + let fut = thrussh::server::run(thrussh_config, &listen_address, server); + + let shutdown_watcher = watch_for_shutdown(shutdown_send); + let reload_watcher = watch_for_reloads(reload_send); + + tokio::select! { + res = fut => res?, + res = &mut audit_handle => res??, + res = shutdown_watcher => res?, + res = reload_watcher => res?, + } + + info!("Finishing audit log writes"); + audit_handle.await??; + info!("Audit log writes finished"); + + Ok(()) +} + +async fn watch_for_shutdown(send: oneshot::Sender<()>) -> Result<(), anyhow::Error> { + tokio::signal::ctrl_c().await?; + info!("Received ctrl-c, initiating shutdown"); + + let _res = send.send(()); + + Ok(()) +} + +async fn watch_for_reloads(send: watch::Sender<()>) -> Result<(), anyhow::Error> { + let mut signal = tokio::signal::unix::signal(SignalKind::hangup())?; + + while let Some(()) = signal.recv().await { + info!("Received SIGHUP, broadcasting reload"); + let _res = send.send(()); + } + + Ok(()) +} diff --git a/pisshoff-server/src/server.rs b/pisshoff-server/src/server.rs new file mode 100644 index 0000000..5e21685 --- /dev/null +++ b/pisshoff-server/src/server.rs @@ -0,0 +1,620 @@ +use crate::audit::{ + ExecCommandEvent, SignalEvent, SubsystemRequestEvent, TcpIpForwardEvent, WindowAdjustedEvent, + WindowChangeRequestEvent, +}; +use crate::{ + audit::{ + AuditLog, AuditLogAction, LoginAttemptEvent, OpenDirectTcpIpEvent, OpenX11Event, + PtyRequestEvent, X11RequestEvent, + }, + command::run_command, + config::Config, + state::State, +}; +use futures::{ + future::{BoxFuture, InspectErr}, + FutureExt, TryFutureExt, +}; +use std::{ + borrow::Cow, + future::Future, + net::SocketAddr, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use thrussh::{ + server::{Auth, Response, Session}, + ChannelId, Pty, Sig, +}; +use thrussh_keys::key::PublicKey; +use tokio::sync::mpsc::UnboundedSender; +use tracing::{debug, error, info, info_span, instrument::Instrumented, Instrument, Span}; + +pub static KEYBOARD_INTERACTIVE_PROMPT: &[(Cow<'static, str>, bool)] = + &[(Cow::Borrowed("Password: "), false)]; +pub const SHELL_PROMPT: &str = "bash-5.1$ "; + +#[derive(Clone)] +pub struct Server { + config: Arc, + state: Arc, + hostname: &'static str, + audit_send: UnboundedSender, +} + +impl Server { + pub fn new( + hostname: &'static str, + config: Arc, + audit_send: UnboundedSender, + ) -> Self { + Self { + config, + hostname, + state: Arc::new(State::default()), + audit_send, + } + } +} + +impl thrussh::server::Server for Server { + type Handler = Connection; + + fn new(&mut self, peer_addr: Option) -> Self::Handler { + let connection_id = uuid::Uuid::new_v4(); + + Connection { + span: info_span!("connection", ?peer_addr, %connection_id), + server: self.clone(), + audit_log: AuditLog { + connection_id, + host: Cow::Borrowed(self.hostname), + peer_address: peer_addr, + ..AuditLog::default() + }, + } + } +} + +pub struct Connection { + span: Span, + server: Server, + audit_log: AuditLog, +} + +impl Connection { + fn try_login(&mut self, user: &str, password: &str) -> bool { + let res = if self + .server + .state + .previously_accepted_passwords + .seen(password) + { + info!(user, password, "Accepted login due to it being used before"); + true + } else if fastrand::f64() <= self.server.config.access_probability { + info!(user, password, "Accepted login randomly"); + self.server + .state + .previously_accepted_passwords + .store(password); + true + } else { + info!(?user, ?password, "Rejected login"); + false + }; + + self.audit_log.push_action(AuditLogAction::LoginAttempt( + LoginAttemptEvent::UsernamePassword { + username: Box::from(user), + password: Box::from(password), + }, + )); + + res + } +} + +impl thrussh::server::Handler for Connection { + type Error = anyhow::Error; + type FutureAuth = HandlerFuture; + type FutureUnit = HandlerFuture; + type FutureBool = + ServerFuture>>; + + fn finished_auth(self, auth: Auth) -> Self::FutureAuth { + let span = info_span!(parent: &self.span, "finished_auth"); + futures::future::ok((self, auth)).boxed().wrap(span) + } + + fn finished_bool(self, b: bool, session: Session) -> Self::FutureBool { + let span = info_span!(parent: &self.span, "finished_bool"); + let _entered = span.enter(); + + futures::future::ok((self, session, b)) + .boxed() + .wrap(Span::current()) + } + + fn finished(self, session: Session) -> Self::FutureUnit { + let span = info_span!(parent: &self.span, "finished"); + let _entered = span.enter(); + + futures::future::ok((self, session)) + .boxed() + .wrap(Span::current()) + } + + fn auth_none(self, _user: &str) -> Self::FutureAuth { + let span = info_span!(parent: &self.span, "auth_none"); + + self.finished_auth(Auth::UnsupportedMethod) + .boxed() + .wrap(span) + } + + fn auth_password(mut self, user: &str, password: &str) -> Self::FutureAuth { + let span = info_span!(parent: &self.span, "auth_password"); + let _entered = span.enter(); + + let res = if self.try_login(user, password) { + Auth::Accept + } else { + Auth::Reject + }; + + self.finished_auth(res) + } + + fn auth_publickey(mut self, _user: &str, public_key: &PublicKey) -> Self::FutureAuth { + let span = info_span!(parent: &self.span, "auth_publickey"); + let _entered = span.enter(); + + let kind = public_key.name(); + let fingerprint = public_key.fingerprint(); + + self.audit_log + .push_action(AuditLogAction::LoginAttempt(LoginAttemptEvent::PublicKey { + kind: Cow::Borrowed(kind), + fingerprint: Box::from(fingerprint), + })); + + self.finished_auth(Auth::Reject) + .boxed() + .wrap(Span::current()) + } + + fn auth_keyboard_interactive( + mut self, + user: &str, + _submethods: &str, + mut response: Option, + ) -> Self::FutureAuth { + let span = info_span!(parent: &self.span, "auth_keyboard_interactive"); + let _entered = span.enter(); + + let result = if let Some(password) = response + .as_mut() + .and_then(Response::next) + .map(String::from_utf8_lossy) + { + if self.try_login(user, password.as_ref()) { + Auth::Accept + } else { + Auth::Reject + } + } else { + debug!("Client is attempting keyboard-interactive, obliging"); + + Auth::Partial { + name: "".into(), + instructions: "".into(), + prompts: KEYBOARD_INTERACTIVE_PROMPT.into(), + } + }; + + self.finished_auth(result) + } + + fn channel_close(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit { + let span = info_span!(parent: &self.span, "channel_close"); + let _entered = span.enter(); + + session.channel_success(channel); + self.finished(session).boxed().wrap(Span::current()) + } + + fn channel_eof(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit { + let span = info_span!(parent: &self.span, "channel_eof"); + let _entered = span.enter(); + + info!("In here"); + + session.channel_success(channel); + self.finished(session).boxed().wrap(Span::current()) + } + + fn channel_open_session(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit { + let span = info_span!(parent: &self.span, "channel_open_session"); + let _entered = span.enter(); + + session.channel_success(channel); + self.finished(session).boxed().wrap(Span::current()) + } + + fn channel_open_x11( + mut self, + channel: ChannelId, + originator_address: &str, + originator_port: u32, + mut session: Session, + ) -> Self::FutureUnit { + let span = info_span!(parent: &self.span, "channel_open_x11"); + let _entered = span.enter(); + + self.audit_log + .push_action(AuditLogAction::OpenX11(OpenX11Event { + originator_address: Box::from(originator_address), + originator_port, + })); + + session.channel_failure(channel); + self.finished(session).boxed().wrap(Span::current()) + } + + fn channel_open_direct_tcpip( + mut self, + channel: ChannelId, + host_to_connect: &str, + port_to_connect: u32, + originator_address: &str, + originator_port: u32, + mut session: Session, + ) -> Self::FutureUnit { + let span = info_span!(parent: &self.span, "channel_open_direct_tcpip"); + let _entered = span.enter(); + + self.audit_log + .push_action(AuditLogAction::OpenDirectTcpIp(OpenDirectTcpIpEvent { + host_to_connect: Box::from(host_to_connect), + port_to_connect, + originator_address: Box::from(originator_address), + originator_port, + })); + + session.channel_failure(channel); + self.finished(session).boxed().wrap(Span::current()) + } + + fn data(mut self, channel: ChannelId, data: &[u8], mut session: Session) -> Self::FutureUnit { + let span = info_span!(parent: &self.span, "data"); + let _entered = span.enter(); + + let data = shlex::split(String::from_utf8_lossy(data).as_ref()); + + async move { + if let Some(args) = data { + run_command(&args, channel, &mut session).await; + self.audit_log + .push_action(AuditLogAction::ExecCommand(ExecCommandEvent { + args: Box::from(args), + })); + } + + session.data(channel, SHELL_PROMPT.to_string().into()); + self.finished(session).await + } + .boxed() + .wrap(Span::current()) + } + + fn extended_data( + self, + _channel: ChannelId, + _code: u32, + _data: &[u8], + session: Session, + ) -> Self::FutureUnit { + let span = info_span!(parent: &self.span, "extended_data"); + let _entered = span.enter(); + + self.finished(session).boxed().wrap(Span::current()) + } + + fn window_adjusted( + mut self, + _channel: ChannelId, + new_window_size: usize, + session: Session, + ) -> Self::FutureUnit { + let span = info_span!(parent: &self.span, "window_adjusted"); + let _entered = span.enter(); + + self.audit_log + .push_action(AuditLogAction::WindowAdjusted(WindowAdjustedEvent { + new_size: new_window_size, + })); + + self.finished(session).boxed().wrap(Span::current()) + } + + fn adjust_window(&mut self, _channel: ChannelId, current: u32) -> u32 { + let span = info_span!(parent: &self.span, "adjust_window"); + let _entered = span.enter(); + + current + } + + fn pty_request( + mut self, + channel: ChannelId, + term: &str, + col_width: u32, + row_height: u32, + pix_width: u32, + pix_height: u32, + modes: &[(Pty, u32)], + mut session: Session, + ) -> Self::FutureUnit { + let span = info_span!(parent: &self.span, "pty_request"); + let _entered = span.enter(); + + self.audit_log + .push_action(AuditLogAction::PtyRequest(PtyRequestEvent { + term: Box::from(term), + col_width, + row_height, + pix_width, + pix_height, + modes: Box::from( + modes + .iter() + .copied() + .map(|(pty, val)| (pty as u8, val)) + .collect::>(), + ), + })); + + session.channel_failure(channel); + self.finished(session).boxed().wrap(Span::current()) + } + + fn x11_request( + mut self, + channel: ChannelId, + single_connection: bool, + x11_auth_protocol: &str, + x11_auth_cookie: &str, + x11_screen_number: u32, + mut session: Session, + ) -> Self::FutureUnit { + let span = info_span!(parent: &self.span, "x11_request"); + let _entered = span.enter(); + + self.audit_log + .push_action(AuditLogAction::X11Request(X11RequestEvent { + single_connection, + x11_auth_protocol: Box::from(x11_auth_protocol), + x11_auth_cookie: Box::from(x11_auth_cookie), + x11_screen_number, + })); + + session.channel_failure(channel); + self.finished(session).boxed().wrap(Span::current()) + } + + fn env_request( + mut self, + channel: ChannelId, + variable_name: &str, + variable_value: &str, + mut session: Session, + ) -> Self::FutureUnit { + let span = info_span!(parent: &self.span, "env_request"); + let _entered = span.enter(); + + self.audit_log + .environment_variables + .push((Box::from(variable_name), Box::from(variable_value))); + + session.channel_success(channel); + self.finished(session).boxed().wrap(Span::current()) + } + + fn shell_request(mut self, channel: ChannelId, mut session: Session) -> Self::FutureUnit { + let span = info_span!(parent: &self.span, "shell_request"); + let _entered = span.enter(); + + self.audit_log.push_action(AuditLogAction::ShellRequested); + + session.data(channel, SHELL_PROMPT.to_string().into()); + + session.channel_success(channel); + self.finished(session).boxed().wrap(Span::current()) + } + + fn exec_request( + mut self, + channel: ChannelId, + data: &[u8], + mut session: Session, + ) -> Self::FutureUnit { + let span = info_span!(parent: &self.span, "exec_request"); + let _entered = span.enter(); + + let data = shlex::split(String::from_utf8_lossy(data).as_ref()); + + async move { + if let Some(args) = data { + run_command(&args, channel, &mut session).await; + self.audit_log + .push_action(AuditLogAction::ExecCommand(ExecCommandEvent { + args: Box::from(args), + })); + + session.channel_success(channel); + } else { + session.channel_failure(channel); + } + + self.finished(session).await + } + .boxed() + .wrap(Span::current()) + } + + fn subsystem_request( + mut self, + channel: ChannelId, + name: &str, + mut session: Session, + ) -> Self::FutureUnit { + let span = info_span!(parent: &self.span, "subsystem_request"); + let _entered = span.enter(); + + self.audit_log + .push_action(AuditLogAction::SubsystemRequest(SubsystemRequestEvent { + name: Box::from(name), + })); + + session.channel_failure(channel); + self.finished(session).boxed().wrap(Span::current()) + } + + fn window_change_request( + mut self, + channel: ChannelId, + col_width: u32, + row_height: u32, + pix_width: u32, + pix_height: u32, + mut session: Session, + ) -> Self::FutureUnit { + let span = info_span!(parent: &self.span, "window_change_request"); + let _entered = span.enter(); + + self.audit_log + .push_action(AuditLogAction::WindowChangeRequest( + WindowChangeRequestEvent { + col_width, + row_height, + pix_width, + pix_height, + }, + )); + + session.channel_success(channel); + self.finished(session).boxed().wrap(Span::current()) + } + + fn signal( + mut self, + _channel: ChannelId, + signal_name: Sig, + session: Session, + ) -> Self::FutureUnit { + let span = info_span!(parent: &self.span, "signal"); + let _entered = span.enter(); + + self.audit_log + .push_action(AuditLogAction::Signal(SignalEvent { + name: format!("{signal_name:?}").into(), + })); + + self.finished(session).boxed().wrap(Span::current()) + } + + fn tcpip_forward(mut self, address: &str, port: u32, session: Session) -> Self::FutureBool { + let span = info_span!(parent: &self.span, "tcpip_forward"); + let _entered = span.enter(); + + self.audit_log + .push_action(AuditLogAction::TcpIpForward(TcpIpForwardEvent { + address: Box::from(address), + port, + })); + + self.finished_bool(false, session) + .boxed() + .wrap(Span::current()) + } + + fn cancel_tcpip_forward( + mut self, + address: &str, + port: u32, + session: Session, + ) -> Self::FutureBool { + let span = info_span!(parent: &self.span, "cancel_tcpip_forward"); + let _entered = span.enter(); + + self.audit_log + .push_action(AuditLogAction::CancelTcpIpForward(TcpIpForwardEvent { + address: Box::from(address), + port, + })); + + self.finished_bool(false, session) + .boxed() + .wrap(Span::current()) + } +} + +impl Drop for Connection { + fn drop(&mut self) { + let span = info_span!(parent: &self.span, "drop"); + let _entered = span.enter(); + + info!("Connection closed"); + + let _res = self + .server + .audit_send + .send(std::mem::take(&mut self.audit_log)); + } +} + +type HandlerResult = Result::Error>; +type HandlerFuture = ServerFuture< + ::Error, + BoxFuture<'static, HandlerResult<(Connection, T)>>, +>; + +/// Wraps a future, providing logging and instrumentation. This provides a newtype over the future +/// (`ServerFuture`) in order to enforce usage within the `thrussh::server::Handler` impl. +pub trait WrapFuture: Sized { + type Ok; + type Err; + + fn wrap(self, span: Span) -> ServerFuture; +} + +impl>> WrapFuture for F { + type Ok = T; + type Err = anyhow::Error; + + fn wrap(self, span: Span) -> ServerFuture { + ServerFuture( + self.inspect_err(log_err as fn(&anyhow::Error)) + .instrument(span), + ) + } +} + +/// Logs an error from a future result. +fn log_err(e: &anyhow::Error) { + error!("Connection closed due to: {}", e); +} + +/// A wrapped future, providing logging ad instrumentation. +#[allow(clippy::type_complexity)] +pub struct ServerFuture(Instrumented>); + +impl> + Unpin> Future for ServerFuture { + type Output = F::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.0).poll(cx) + } +} diff --git a/pisshoff-server/src/state.rs b/pisshoff-server/src/state.rs new file mode 100644 index 0000000..0ade91a --- /dev/null +++ b/pisshoff-server/src/state.rs @@ -0,0 +1,22 @@ +use parking_lot::RwLock; +use std::collections::HashSet; + +#[derive(Default)] +pub struct State { + /// A list of passwords that have previously been accepted, and will forever be accepted + /// to further attract the bear. + pub previously_accepted_passwords: StoredPasswords, +} + +#[derive(Default)] +pub struct StoredPasswords(RwLock>>); + +impl StoredPasswords { + pub fn seen(&self, password: &str) -> bool { + self.0.read().contains(password) + } + + pub fn store(&self, password: &str) -> bool { + self.0.write().insert(Box::from(password.to_string())) + } +} diff --git a/pisshoff-timescaledb-exporter/Cargo.toml b/pisshoff-timescaledb-exporter/Cargo.toml new file mode 100644 index 0000000..fba62ce --- /dev/null +++ b/pisshoff-timescaledb-exporter/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "pisshoff-timescaledb-exporter" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +pisshoff-types = { path = "../pisshoff-types" } + +anyhow = "1.0" +clap = { version = "4.3", features = ["derive", "env", "cargo"] } +deadpool-postgres = { version = "0.10", features = ["rt_tokio_1", "serde"] } +futures = "0.3" +refinery = { version = "0.8", features = ["tokio-postgres"] } +tokio = { version = "1.28", features = ["full"] } +tokio-util = { version = "0.7", features = ["codec"] } +tokio-postgres = { version = "0.7", features = ["with-time-0_3", "with-uuid-1", "with-serde_json-1"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +toml = "0.7" diff --git a/pisshoff-timescaledb-exporter/config.toml b/pisshoff-timescaledb-exporter/config.toml new file mode 100644 index 0000000..be607c3 --- /dev/null +++ b/pisshoff-timescaledb-exporter/config.toml @@ -0,0 +1,7 @@ +socket-path = "test.sock" + +[pg] +user = "postgres" +dbname = "pisshoff" +host = "127.0.0.1" +port = 64601 diff --git a/pisshoff-timescaledb-exporter/migrations/V1__initial.sql b/pisshoff-timescaledb-exporter/migrations/V1__initial.sql new file mode 100644 index 0000000..9d0eea6 --- /dev/null +++ b/pisshoff-timescaledb-exporter/migrations/V1__initial.sql @@ -0,0 +1,30 @@ +CREATE TABLE audit ( + timestamp TIMESTAMPTZ NOT NULL, + connection_id UUID NOT NULL, + peer_address TEXT NOT NULL, + host TEXT NOT NULL, + UNIQUE(timestamp) +); + +SELECT create_hypertable('audit', 'timestamp'); + +CREATE TABLE audit_environment_variables ( + connection_id uuid NOT NULL, + name TEXT NOT NULL, + value TEXT NOT NULL +); + +CREATE INDEX audit_environment_variables_connection_id ON audit_environment_variables USING HASH (connection_id); +CREATE INDEX audit_environment_variables_name ON audit_environment_variables USING HASH (name); + +CREATE TABLE audit_events ( + timestamp TIMESTAMPTZ NOT NULL, + connection_id UUID NOT NULL, + type TEXT NOT NULL, + content JSONB +); + +SELECT create_hypertable('audit_events', 'timestamp'); + +CREATE INDEX audit_events_connection_id ON audit_events USING HASH (connection_id); +CREATE INDEX audit_events_type ON audit_events USING HASH (type); \ No newline at end of file diff --git a/pisshoff-timescaledb-exporter/src/config.rs b/pisshoff-timescaledb-exporter/src/config.rs new file mode 100644 index 0000000..8cb9e34 --- /dev/null +++ b/pisshoff-timescaledb-exporter/src/config.rs @@ -0,0 +1,39 @@ +use clap::Parser; +use serde::{de::DeserializeOwned, Deserialize}; +use std::{io::ErrorKind, path::PathBuf, sync::Arc}; + +/// Parser for command line arguments +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +pub struct Args { + #[arg(short, long, env, value_parser = load_config::)] + pub config: Arc, + #[arg(short, long, action = clap::ArgAction::Count)] + pub verbose: u8, +} + +impl Args { + pub fn verbosity(&self) -> &'static str { + match self.verbose { + 0 => "info", + 1 => "debug,thrussh=info", + 2 => "debug", + _ => "trace", + } + } +} + +#[derive(Deserialize)] +#[serde(rename_all = "kebab-case")] +pub struct Config { + pub socket_path: PathBuf, + pub pg: deadpool_postgres::Config, +} + +fn load_config(path: &str) -> Result, std::io::Error> { + let file = std::fs::read_to_string(path)?; + + toml::from_str(&file) + .map(Arc::new) + .map_err(|e| std::io::Error::new(ErrorKind::Other, e)) +} diff --git a/pisshoff-timescaledb-exporter/src/main.rs b/pisshoff-timescaledb-exporter/src/main.rs new file mode 100644 index 0000000..9cce26a --- /dev/null +++ b/pisshoff-timescaledb-exporter/src/main.rs @@ -0,0 +1,156 @@ +#![deny(clippy::pedantic)] +#![allow(clippy::module_name_repetitions)] + +use crate::config::Args; +use clap::Parser; +use deadpool_postgres::{ + tokio_postgres::{NoTls, Statement, Transaction}, + GenericClient, Runtime, +}; +use futures::{StreamExt, TryFutureExt}; +use pisshoff_types::audit::{AuditLog, AuditLogEvent}; +use std::sync::Arc; +use tokio::net::{UnixListener, UnixStream}; +use tokio_util::codec::{Decoder, LinesCodec}; +use tracing::{error, info}; +use tracing_subscriber::EnvFilter; + +mod config; + +mod embedded { + use refinery::embed_migrations; + embed_migrations!(); +} + +pub struct Context { + db: deadpool_postgres::Pool, +} + +#[tokio::main] +async fn main() { + if let Err(e) = run().await { + error!("Failed to run {}: {}", env!("CARGO_CRATE_NAME"), e); + std::process::exit(1); + } +} + +async fn run() -> anyhow::Result<()> { + let args = Args::parse(); + + std::env::set_var("RUST_LOG", args.verbosity()); + + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); + + let db = args.config.pg.create_pool(Some(Runtime::Tokio1), NoTls)?; + let context = Arc::new(Context { db }); + + embedded::migrations::runner() + .run_async(&mut **context.db.get().await?) + .await?; + + spawn_listener(&args, context).await +} + +async fn spawn_listener(args: &Args, context: Arc) -> anyhow::Result<()> { + let listener = UnixListener::bind(&args.config.socket_path)?; + + loop { + let (stream, remote) = listener.accept().await?; + + info!(?remote, "Accepted incoming connection"); + + let context = context.clone(); + + tokio::spawn(async move { + if let Err(e) = handle_connection(stream, context).await { + error!("Connection failed: {e}"); + } + }); + } +} + +async fn handle_connection(stream: UnixStream, context: Arc) -> anyhow::Result<()> { + let mut framed = LinesCodec::new().framed(stream); + + while let Some(line) = framed.next().await.transpose()? { + let context = context.clone(); + + tokio::spawn( + ingest_log(context, line).inspect_err(|e| error!("Failed to ingest log: {e}")), + ); + } + + Ok(()) +} + +async fn ingest_log(context: Arc, line: String) -> anyhow::Result<()> { + let line: AuditLog = serde_json::from_str(&line)?; + + let Some(peer_address) = line.peer_address else { + return Ok(()); + }; + + let mut connection = context.db.get().await?; + let tx = connection.transaction().await?; + + tokio::try_join!( + async { + tx + .execute( + "INSERT INTO audit (timestamp, connection_id, peer_address, host) VALUES ($1, $2, $3, $4)", + &[&line.ts, &line.connection_id, &peer_address.to_string(), &line.host], + ) + .await + .map_err(anyhow::Error::from) + }, + async { + let prepared = tx.prepare("INSERT INTO audit_environment_variables (connection_id, name, value) VALUES ($1, $2, $3)").await?; + + futures::future::try_join_all( + line.environment_variables + .iter() + .map(|(key, value)| async { tx.execute(&prepared, &[key, value]).await }), + ) + .await + .map_err(anyhow::Error::from) + }, + async { + let prepared = tx.prepare("INSERT INTO audit_events (timestamp, connection_id, type, content) VALUES ($1, $2, $3, $4)").await?; + + futures::future::try_join_all( + line.events + .iter() + .map(|event| insert_event(&tx, &prepared, &line, event)), + ) + .await + } + )?; + + tx.commit().await?; + + Ok(()) +} + +async fn insert_event( + tx: &Transaction<'_>, + prepared: &Statement, + line: &AuditLog, + event: &AuditLogEvent, +) -> anyhow::Result<()> { + let ts = line.ts + event.start_offset; + + tx.execute( + prepared, + &[ + &ts, + &line.connection_id, + &<&'static str>::from(&event.action), + &serde_json::to_value(&event.action)?, + ], + ) + .await?; + + Ok(()) +} diff --git a/pisshoff-types/Cargo.toml b/pisshoff-types/Cargo.toml new file mode 100644 index 0000000..e224dc1 --- /dev/null +++ b/pisshoff-types/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "pisshoff-types" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +uuid = "1.3" +time = { version = "0.3", features = ["serde", "formatting", "parsing"] } +serde = { version = "1.0", features = ["derive"] } +strum = { version = "0.24", features = ["derive"] } diff --git a/pisshoff-types/src/audit.rs b/pisshoff-types/src/audit.rs new file mode 100644 index 0000000..392d01e --- /dev/null +++ b/pisshoff-types/src/audit.rs @@ -0,0 +1,162 @@ +use serde::{Deserialize, Serialize}; +use std::borrow::Cow; +use std::{ + fmt::{Debug, Formatter}, + net::SocketAddr, + time::{Duration, Instant}, +}; +use strum::IntoStaticStr; +use time::OffsetDateTime; +use uuid::Uuid; + +#[derive(Serialize, Deserialize)] +pub struct AuditLog { + pub connection_id: Uuid, + #[serde(with = "time::serde::rfc3339")] + pub ts: OffsetDateTime, + pub peer_address: Option, + pub host: Cow<'static, str>, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub environment_variables: Vec<(Box, Box)>, + pub events: Vec, + #[serde(skip, default = "Instant::now")] + pub start: Instant, +} + +impl Default for AuditLog { + fn default() -> Self { + Self { + connection_id: Uuid::default(), + ts: OffsetDateTime::now_utc(), + host: Cow::Borrowed(""), + peer_address: None, + environment_variables: vec![], + events: vec![], + start: Instant::now(), + } + } +} + +impl Debug for AuditLog { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AuditLog") + .field("connection_id", &self.connection_id) + .field("peer_address", &self.peer_address) + .field("environment_variables", &self.environment_variables) + .field("events", &self.events) + .finish() + } +} + +impl AuditLog { + pub fn push_action(&mut self, action: AuditLogAction) { + self.events.push(AuditLogEvent { + start_offset: self.start.elapsed(), + action, + }); + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct AuditLogEvent { + pub start_offset: Duration, + pub action: AuditLogAction, +} + +#[derive(Debug, Serialize, Deserialize, IntoStaticStr)] +#[serde(tag = "type", rename_all = "kebab-case")] +#[strum(serialize_all = "kebab-case")] +pub enum AuditLogAction { + LoginAttempt(LoginAttemptEvent), + PtyRequest(PtyRequestEvent), + X11Request(X11RequestEvent), + OpenX11(OpenX11Event), + OpenDirectTcpIp(OpenDirectTcpIpEvent), + ExecCommand(ExecCommandEvent), + WindowAdjusted(WindowAdjustedEvent), + ShellRequested, + SubsystemRequest(SubsystemRequestEvent), + WindowChangeRequest(WindowChangeRequestEvent), + Signal(SignalEvent), + TcpIpForward(TcpIpForwardEvent), + CancelTcpIpForward(TcpIpForwardEvent), +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ExecCommandEvent { + pub args: Box<[String]>, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct WindowAdjustedEvent { + pub new_size: usize, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SubsystemRequestEvent { + pub name: Box, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SignalEvent { + pub name: Box, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "credential-type", rename_all = "kebab-case")] +pub enum LoginAttemptEvent { + UsernamePassword { + username: Box, + password: Box, + }, + PublicKey { + kind: Cow<'static, str>, + fingerprint: Box, + }, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct PtyRequestEvent { + pub term: Box, + pub col_width: u32, + pub row_height: u32, + pub pix_width: u32, + pub pix_height: u32, + pub modes: Box<[(u8, u32)]>, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct OpenX11Event { + pub originator_address: Box, + pub originator_port: u32, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct X11RequestEvent { + pub single_connection: bool, + pub x11_auth_protocol: Box, + pub x11_auth_cookie: Box, + pub x11_screen_number: u32, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct OpenDirectTcpIpEvent { + pub host_to_connect: Box, + pub port_to_connect: u32, + pub originator_address: Box, + pub originator_port: u32, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct WindowChangeRequestEvent { + pub col_width: u32, + pub row_height: u32, + pub pix_width: u32, + pub pix_height: u32, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct TcpIpForwardEvent { + pub address: Box, + pub port: u32, +} diff --git a/pisshoff-types/src/lib.rs b/pisshoff-types/src/lib.rs new file mode 100644 index 0000000..1827072 --- /dev/null +++ b/pisshoff-types/src/lib.rs @@ -0,0 +1,4 @@ +#![deny(clippy::pedantic)] +#![allow(clippy::module_name_repetitions)] + +pub mod audit; diff --git a/src/audit.rs b/src/audit.rs deleted file mode 100644 index b340708..0000000 --- a/src/audit.rs +++ /dev/null @@ -1,233 +0,0 @@ -use crate::config::Config; -use serde::Serialize; -use std::{ - fmt::{Debug, Formatter}, - io::ErrorKind, - net::SocketAddr, - sync::Arc, - time::{Duration, Instant}, -}; -use time::OffsetDateTime; -use tokio::{ - fs::OpenOptions, - io::{AsyncWriteExt, BufWriter}, - sync::{oneshot, watch}, - task::JoinHandle, -}; -use tracing::{debug, info}; -use uuid::Uuid; - -pub fn start_audit_writer( - config: Arc, - mut reload: watch::Receiver<()>, - mut shutdown_recv: oneshot::Receiver<()>, -) -> ( - tokio::sync::mpsc::UnboundedSender, - JoinHandle>, -) { - let (send, mut recv) = tokio::sync::mpsc::unbounded_channel(); - - let handle = tokio::spawn(async move { - let open_writer = || async { - let file = OpenOptions::default() - .create(true) - .append(true) - .open(&config.audit_output_file) - .await?; - Ok::<_, std::io::Error>(BufWriter::new(file)) - }; - - let mut writer = open_writer().await?; - let mut shutdown = false; - - while !shutdown { - tokio::select! { - log = recv.recv() => { - match log { - Some(log) => { - let log = serde_json::to_vec(&log) - .map_err(|e| std::io::Error::new(ErrorKind::Other, e))?; - writer.write_all(&log).await?; - writer.write_all("\n".as_bytes()).await?; - } - None => { - shutdown = true; - } - } - } - _ = &mut shutdown_recv => { - shutdown = true; - } - _ = tokio::time::sleep(Duration::from_secs(5)), if !writer.buffer().is_empty() => { - debug!("Flushing audits to disk"); - writer.flush().await?; - } - Ok(()) = reload.changed() => { - info!("Flushing audits to disk"); - writer.flush().await?; - - info!("Reopening handle to log file"); - writer = open_writer().await?; - - info!("Successfully re-opened log file"); - } - else => break, - } - } - - writer.flush().await?; - - Ok(()) - }); - - (send, handle) -} - -#[derive(Serialize)] -pub struct AuditLog { - pub connection_id: Uuid, - #[serde(with = "time::serde::rfc3339")] - pub ts: OffsetDateTime, - pub peer_address: Option, - #[serde(skip_serializing_if = "Vec::is_empty")] - pub environment_variables: Vec<(Box, Box)>, - pub events: Vec, - #[serde(skip)] - pub start: Instant, -} - -impl Default for AuditLog { - fn default() -> Self { - Self { - connection_id: Uuid::default(), - ts: OffsetDateTime::now_utc(), - peer_address: None, - environment_variables: vec![], - events: vec![], - start: Instant::now(), - } - } -} - -impl Debug for AuditLog { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("AuditLog") - .field("connection_id", &self.connection_id) - .field("peer_address", &self.peer_address) - .field("environment_variables", &self.environment_variables) - .field("events", &self.events) - .finish() - } -} - -impl AuditLog { - pub fn push_action(&mut self, action: AuditLogAction) { - self.events.push(AuditLogEvent { - start_offset: self.start.elapsed(), - action, - }); - } -} - -#[derive(Debug, Serialize)] -pub struct AuditLogEvent { - pub start_offset: Duration, - pub action: AuditLogAction, -} - -#[derive(Debug, Serialize)] -#[serde(tag = "type", rename_all = "kebab-case")] -pub enum AuditLogAction { - LoginAttempt(LoginAttemptEvent), - PtyRequest(PtyRequestEvent), - X11Request(X11RequestEvent), - OpenX11(OpenX11Event), - OpenDirectTcpIp(OpenDirectTcpIpEvent), - ExecCommand(ExecCommandEvent), - WindowAdjusted(WindowAdjustedEvent), - ShellRequested, - SubsystemRequest(SubsystemRequestEvent), - WindowChangeRequest(WindowChangeRequestEvent), - Signal(SignalEvent), - TcpIpForward(TcpIpForwardEvent), - CancelTcpIpForward(TcpIpForwardEvent), -} - -#[derive(Debug, Serialize)] -pub struct ExecCommandEvent { - pub args: Box<[String]>, -} - -#[derive(Debug, Serialize)] -pub struct WindowAdjustedEvent { - pub new_size: usize, -} - -#[derive(Debug, Serialize)] -pub struct SubsystemRequestEvent { - pub name: Box, -} - -#[derive(Debug, Serialize)] -pub struct SignalEvent { - pub name: Box, -} - -#[derive(Debug, Serialize)] -#[serde(tag = "credential-type", rename_all = "kebab-case")] -pub enum LoginAttemptEvent { - UsernamePassword { - username: Box, - password: Box, - }, - PublicKey { - kind: &'static str, - fingerprint: Box, - }, -} - -#[derive(Debug, Serialize)] -pub struct PtyRequestEvent { - pub term: Box, - pub col_width: u32, - pub row_height: u32, - pub pix_width: u32, - pub pix_height: u32, - pub modes: Box<[(u8, u32)]>, -} - -#[derive(Debug, Serialize)] -pub struct OpenX11Event { - pub originator_address: Box, - pub originator_port: u32, -} - -#[derive(Debug, Serialize)] -pub struct X11RequestEvent { - pub single_connection: bool, - pub x11_auth_protocol: Box, - pub x11_auth_cookie: Box, - pub x11_screen_number: u32, -} - -#[derive(Debug, Serialize)] -pub struct OpenDirectTcpIpEvent { - pub host_to_connect: Box, - pub port_to_connect: u32, - pub originator_address: Box, - pub originator_port: u32, -} - -#[derive(Debug, Serialize)] -pub struct WindowChangeRequestEvent { - pub col_width: u32, - pub row_height: u32, - pub pix_width: u32, - pub pix_height: u32, -} - -#[derive(Debug, Serialize)] -pub struct TcpIpForwardEvent { - pub address: Box, - pub port: u32, -} diff --git a/src/command.rs b/src/command.rs deleted file mode 100644 index 7929f7a..0000000 --- a/src/command.rs +++ /dev/null @@ -1,51 +0,0 @@ -use itertools::Itertools; -use std::{f32, str::FromStr, time::Duration}; -use thrussh::{server::Session, ChannelId}; - -pub async fn run_command(args: &[String], channel: ChannelId, session: &mut Session) { - let Some(command) = args.get(0) else { - return; - }; - - match command.as_str() { - "echo" => { - session.data( - channel, - format!("{}\n", args.iter().skip(1).join(" ")).into(), - ); - } - "whoami" => { - // TODO: grab "logged in" user - session.data(channel, "root\n".to_string().into()); - } - "pwd" => { - // TODO: mock FHS - session.data(channel, "/root\n".to_string().into()); - } - "ls" => { - // pretend /root is empty until we mock the FHS - } - "exit" => { - let exit_status = args - .get(1) - .map(String::as_str) - .map_or(Ok(0), u32::from_str) - .unwrap_or(2); - - session.exit_status_request(channel, exit_status); - session.close(channel); - } - "sleep" => { - if let Some(Ok(secs)) = args.get(1).map(String::as_str).map(f32::from_str) { - tokio::time::sleep(Duration::from_secs_f32(secs)).await; - } - } - other => { - // TODO: fix stderr displaying out of order - session.data( - channel, - format!("bash: {other}: command not found\n").into(), - ); - } - } -} diff --git a/src/config.rs b/src/config.rs deleted file mode 100644 index 753d8eb..0000000 --- a/src/config.rs +++ /dev/null @@ -1,72 +0,0 @@ -use clap::Parser; -use serde::{de::DeserializeOwned, Deserialize}; -use std::path::PathBuf; -use std::sync::Arc; -use std::{io::ErrorKind, net::SocketAddr}; - -/// Parser for command line arguments, these arguments can also be passed via capitalised env vars -/// of the same name. -#[derive(Parser)] -#[command(author, version, about, long_about = None)] -pub struct Args { - #[arg(short, long, env, value_parser = load_config::)] - pub config: Arc, - #[arg(short, long, action = clap::ArgAction::Count)] - pub verbose: u8, -} - -impl Args { - pub fn verbosity(&self) -> &'static str { - match self.verbose { - 0 => "info", - 1 => "debug,thrussh=info", - 2 => "debug", - _ => "trace", - } - } -} - -#[derive(Deserialize, Clone)] -#[serde(rename_all = "kebab-case")] -pub struct Config { - /// Address for the server to listen on. - #[serde(default = "Config::default_listen_address")] - pub listen_address: SocketAddr, - /// The probability that an authentication attempt will succeed, once a given password - /// has been accepted once - it will be accepted for the rest of the lifetime of the - /// instance. - #[serde(default = "Config::default_access_probability")] - pub access_probability: f64, - /// Path of the file to write audit logs to. - #[serde(default = "Config::default_audit_output_file")] - pub audit_output_file: PathBuf, - /// The server ID string sent at the beginning of the SSH connection. - #[serde(default = "Config::default_server_id")] - pub server_id: String, -} - -impl Config { - fn default_listen_address() -> SocketAddr { - "0.0.0.0:22".parse().unwrap() - } - - fn default_access_probability() -> f64 { - 0.2 - } - - fn default_audit_output_file() -> PathBuf { - "/var/log/pisshoff/audit.log".parse().unwrap() - } - - fn default_server_id() -> String { - "SSH-2.0-OpenSSH_9.3".to_string() - } -} - -fn load_config(path: &str) -> Result, std::io::Error> { - let file = std::fs::read_to_string(path)?; - - toml::from_str(&file) - .map(Arc::new) - .map_err(|e| std::io::Error::new(ErrorKind::Other, e)) -} diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index d53ac86..0000000 --- a/src/main.rs +++ /dev/null @@ -1,103 +0,0 @@ -#![deny(clippy::pedantic)] -#![allow(clippy::module_name_repetitions)] - -use crate::{config::Args, server::Server}; -use clap::Parser; -use futures::FutureExt; -use std::sync::Arc; -use thrussh::MethodSet; -use tokio::{ - signal::unix::SignalKind, - sync::{oneshot, watch}, -}; -use tracing::{error, info}; -use tracing_subscriber::EnvFilter; - -mod audit; -mod command; -mod config; -mod server; -mod state; - -#[tokio::main] -async fn main() { - if let Err(e) = run().await { - error!("Failed to run {}: {}", env!("CARGO_CRATE_NAME"), e); - std::process::exit(1); - } -} - -async fn run() -> anyhow::Result<()> { - let args = Args::parse(); - - std::env::set_var("RUST_LOG", args.verbosity()); - - tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env()) - .init(); - - info!( - "{} listening on {}", - env!("CARGO_CRATE_NAME"), - args.config.listen_address - ); - - let keys = vec![thrussh_keys::key::KeyPair::generate_ed25519().unwrap()]; - - let thrussh_config = Arc::new(thrussh::server::Config { - server_id: args.config.server_id.to_string(), - methods: MethodSet::PASSWORD | MethodSet::PUBLICKEY | MethodSet::KEYBOARD_INTERACTIVE, - keys, - auth_rejection_time: std::time::Duration::from_secs(1), - ..thrussh::server::Config::default() - }); - - let (reload_send, reload_recv) = watch::channel(()); - let (shutdown_send, shutdown_recv) = oneshot::channel(); - - let (audit_send, audit_handle) = - audit::start_audit_writer(args.config.clone(), reload_recv, shutdown_recv); - let mut audit_handle = audit_handle.fuse(); - - let server = Server::new(args.config.clone(), audit_send); - let listen_address = args.config.listen_address.to_string(); - - // TODO: needs clean shutdowns on clients - let fut = thrussh::server::run(thrussh_config, &listen_address, server); - - let shutdown_watcher = watch_for_shutdown(shutdown_send); - let reload_watcher = watch_for_reloads(reload_send); - - tokio::select! { - res = fut => res?, - res = &mut audit_handle => res??, - res = shutdown_watcher => res?, - res = reload_watcher => res?, - } - - info!("Finishing audit log writes"); - audit_handle.await??; - info!("Audit log writes finished"); - - Ok(()) -} - -async fn watch_for_shutdown(send: oneshot::Sender<()>) -> Result<(), anyhow::Error> { - tokio::signal::ctrl_c().await?; - info!("Received ctrl-c, initiating shutdown"); - - let _res = send.send(()); - - Ok(()) -} - -async fn watch_for_reloads(send: watch::Sender<()>) -> Result<(), anyhow::Error> { - let mut signal = tokio::signal::unix::signal(SignalKind::hangup())?; - - while let Some(()) = signal.recv().await { - info!("Received SIGHUP, broadcasting reload"); - let _res = send.send(()); - } - - Ok(()) -} diff --git a/src/server.rs b/src/server.rs deleted file mode 100644 index f3437a6..0000000 --- a/src/server.rs +++ /dev/null @@ -1,613 +0,0 @@ -use crate::audit::{ - ExecCommandEvent, SignalEvent, SubsystemRequestEvent, TcpIpForwardEvent, WindowAdjustedEvent, - WindowChangeRequestEvent, -}; -use crate::{ - audit::{ - AuditLog, AuditLogAction, LoginAttemptEvent, OpenDirectTcpIpEvent, OpenX11Event, - PtyRequestEvent, X11RequestEvent, - }, - command::run_command, - config::Config, - state::State, -}; -use futures::{ - future::{BoxFuture, InspectErr}, - FutureExt, TryFutureExt, -}; -use std::{ - borrow::Cow, - future::Future, - net::SocketAddr, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; -use thrussh::{ - server::{Auth, Response, Session}, - ChannelId, Pty, Sig, -}; -use thrussh_keys::key::PublicKey; -use tokio::sync::mpsc::UnboundedSender; -use tracing::{debug, error, info, info_span, instrument::Instrumented, Instrument, Span}; - -pub static KEYBOARD_INTERACTIVE_PROMPT: &[(Cow<'static, str>, bool)] = - &[(Cow::Borrowed("Password: "), false)]; -pub const SHELL_PROMPT: &str = "bash-5.1$ "; - -#[derive(Clone)] -pub struct Server { - config: Arc, - state: Arc, - audit_send: UnboundedSender, -} - -impl Server { - pub fn new(config: Arc, audit_send: UnboundedSender) -> Self { - Self { - config, - state: Arc::new(State::default()), - audit_send, - } - } -} - -impl thrussh::server::Server for Server { - type Handler = Connection; - - fn new(&mut self, peer_addr: Option) -> Self::Handler { - let connection_id = uuid::Uuid::new_v4(); - - Connection { - span: info_span!("connection", ?peer_addr, %connection_id), - server: self.clone(), - audit_log: AuditLog { - connection_id, - peer_address: peer_addr, - ..AuditLog::default() - }, - } - } -} - -pub struct Connection { - span: Span, - server: Server, - audit_log: AuditLog, -} - -impl Connection { - fn try_login(&mut self, user: &str, password: &str) -> bool { - let res = if self - .server - .state - .previously_accepted_passwords - .seen(password) - { - info!(user, password, "Accepted login due to it being used before"); - true - } else if fastrand::f64() <= self.server.config.access_probability { - info!(user, password, "Accepted login randomly"); - self.server - .state - .previously_accepted_passwords - .store(password); - true - } else { - info!(?user, ?password, "Rejected login"); - false - }; - - self.audit_log.push_action(AuditLogAction::LoginAttempt( - LoginAttemptEvent::UsernamePassword { - username: Box::from(user), - password: Box::from(password), - }, - )); - - res - } -} - -impl thrussh::server::Handler for Connection { - type Error = anyhow::Error; - type FutureAuth = HandlerFuture; - type FutureUnit = HandlerFuture; - type FutureBool = - ServerFuture>>; - - fn finished_auth(self, auth: Auth) -> Self::FutureAuth { - let span = info_span!(parent: &self.span, "finished_auth"); - futures::future::ok((self, auth)).boxed().wrap(span) - } - - fn finished_bool(self, b: bool, session: Session) -> Self::FutureBool { - let span = info_span!(parent: &self.span, "finished_bool"); - let _entered = span.enter(); - - futures::future::ok((self, session, b)) - .boxed() - .wrap(Span::current()) - } - - fn finished(self, session: Session) -> Self::FutureUnit { - let span = info_span!(parent: &self.span, "finished"); - let _entered = span.enter(); - - futures::future::ok((self, session)) - .boxed() - .wrap(Span::current()) - } - - fn auth_none(self, _user: &str) -> Self::FutureAuth { - let span = info_span!(parent: &self.span, "auth_none"); - - self.finished_auth(Auth::UnsupportedMethod) - .boxed() - .wrap(span) - } - - fn auth_password(mut self, user: &str, password: &str) -> Self::FutureAuth { - let span = info_span!(parent: &self.span, "auth_password"); - let _entered = span.enter(); - - let res = if self.try_login(user, password) { - Auth::Accept - } else { - Auth::Reject - }; - - self.finished_auth(res) - } - - fn auth_publickey(mut self, _user: &str, public_key: &PublicKey) -> Self::FutureAuth { - let span = info_span!(parent: &self.span, "auth_publickey"); - let _entered = span.enter(); - - let kind = public_key.name(); - let fingerprint = public_key.fingerprint(); - - self.audit_log - .push_action(AuditLogAction::LoginAttempt(LoginAttemptEvent::PublicKey { - kind, - fingerprint: Box::from(fingerprint), - })); - - self.finished_auth(Auth::Reject) - .boxed() - .wrap(Span::current()) - } - - fn auth_keyboard_interactive( - mut self, - user: &str, - _submethods: &str, - mut response: Option, - ) -> Self::FutureAuth { - let span = info_span!(parent: &self.span, "auth_keyboard_interactive"); - let _entered = span.enter(); - - let result = if let Some(password) = response - .as_mut() - .and_then(Response::next) - .map(String::from_utf8_lossy) - { - if self.try_login(user, password.as_ref()) { - Auth::Accept - } else { - Auth::Reject - } - } else { - debug!("Client is attempting keyboard-interactive, obliging"); - - Auth::Partial { - name: "".into(), - instructions: "".into(), - prompts: KEYBOARD_INTERACTIVE_PROMPT.into(), - } - }; - - self.finished_auth(result) - } - - fn channel_close(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit { - let span = info_span!(parent: &self.span, "channel_close"); - let _entered = span.enter(); - - session.channel_success(channel); - self.finished(session).boxed().wrap(Span::current()) - } - - fn channel_eof(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit { - let span = info_span!(parent: &self.span, "channel_eof"); - let _entered = span.enter(); - - info!("In here"); - - session.channel_success(channel); - self.finished(session).boxed().wrap(Span::current()) - } - - fn channel_open_session(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit { - let span = info_span!(parent: &self.span, "channel_open_session"); - let _entered = span.enter(); - - session.channel_success(channel); - self.finished(session).boxed().wrap(Span::current()) - } - - fn channel_open_x11( - mut self, - channel: ChannelId, - originator_address: &str, - originator_port: u32, - mut session: Session, - ) -> Self::FutureUnit { - let span = info_span!(parent: &self.span, "channel_open_x11"); - let _entered = span.enter(); - - self.audit_log - .push_action(AuditLogAction::OpenX11(OpenX11Event { - originator_address: Box::from(originator_address), - originator_port, - })); - - session.channel_failure(channel); - self.finished(session).boxed().wrap(Span::current()) - } - - fn channel_open_direct_tcpip( - mut self, - channel: ChannelId, - host_to_connect: &str, - port_to_connect: u32, - originator_address: &str, - originator_port: u32, - mut session: Session, - ) -> Self::FutureUnit { - let span = info_span!(parent: &self.span, "channel_open_direct_tcpip"); - let _entered = span.enter(); - - self.audit_log - .push_action(AuditLogAction::OpenDirectTcpIp(OpenDirectTcpIpEvent { - host_to_connect: Box::from(host_to_connect), - port_to_connect, - originator_address: Box::from(originator_address), - originator_port, - })); - - session.channel_failure(channel); - self.finished(session).boxed().wrap(Span::current()) - } - - fn data(mut self, channel: ChannelId, data: &[u8], mut session: Session) -> Self::FutureUnit { - let span = info_span!(parent: &self.span, "data"); - let _entered = span.enter(); - - let data = shlex::split(String::from_utf8_lossy(data).as_ref()); - - async move { - if let Some(args) = data { - run_command(&args, channel, &mut session).await; - self.audit_log - .push_action(AuditLogAction::ExecCommand(ExecCommandEvent { - args: Box::from(args), - })); - } - - session.data(channel, SHELL_PROMPT.to_string().into()); - self.finished(session).await - } - .boxed() - .wrap(Span::current()) - } - - fn extended_data( - self, - _channel: ChannelId, - _code: u32, - _data: &[u8], - session: Session, - ) -> Self::FutureUnit { - let span = info_span!(parent: &self.span, "extended_data"); - let _entered = span.enter(); - - self.finished(session).boxed().wrap(Span::current()) - } - - fn window_adjusted( - mut self, - _channel: ChannelId, - new_window_size: usize, - session: Session, - ) -> Self::FutureUnit { - let span = info_span!(parent: &self.span, "window_adjusted"); - let _entered = span.enter(); - - self.audit_log - .push_action(AuditLogAction::WindowAdjusted(WindowAdjustedEvent { - new_size: new_window_size, - })); - - self.finished(session).boxed().wrap(Span::current()) - } - - fn adjust_window(&mut self, _channel: ChannelId, current: u32) -> u32 { - let span = info_span!(parent: &self.span, "adjust_window"); - let _entered = span.enter(); - - current - } - - fn pty_request( - mut self, - channel: ChannelId, - term: &str, - col_width: u32, - row_height: u32, - pix_width: u32, - pix_height: u32, - modes: &[(Pty, u32)], - mut session: Session, - ) -> Self::FutureUnit { - let span = info_span!(parent: &self.span, "pty_request"); - let _entered = span.enter(); - - self.audit_log - .push_action(AuditLogAction::PtyRequest(PtyRequestEvent { - term: Box::from(term), - col_width, - row_height, - pix_width, - pix_height, - modes: Box::from( - modes - .iter() - .copied() - .map(|(pty, val)| (pty as u8, val)) - .collect::>(), - ), - })); - - session.channel_failure(channel); - self.finished(session).boxed().wrap(Span::current()) - } - - fn x11_request( - mut self, - channel: ChannelId, - single_connection: bool, - x11_auth_protocol: &str, - x11_auth_cookie: &str, - x11_screen_number: u32, - mut session: Session, - ) -> Self::FutureUnit { - let span = info_span!(parent: &self.span, "x11_request"); - let _entered = span.enter(); - - self.audit_log - .push_action(AuditLogAction::X11Request(X11RequestEvent { - single_connection, - x11_auth_protocol: Box::from(x11_auth_protocol), - x11_auth_cookie: Box::from(x11_auth_cookie), - x11_screen_number, - })); - - session.channel_failure(channel); - self.finished(session).boxed().wrap(Span::current()) - } - - fn env_request( - mut self, - channel: ChannelId, - variable_name: &str, - variable_value: &str, - mut session: Session, - ) -> Self::FutureUnit { - let span = info_span!(parent: &self.span, "env_request"); - let _entered = span.enter(); - - self.audit_log - .environment_variables - .push((Box::from(variable_name), Box::from(variable_value))); - - session.channel_success(channel); - self.finished(session).boxed().wrap(Span::current()) - } - - fn shell_request(mut self, channel: ChannelId, mut session: Session) -> Self::FutureUnit { - let span = info_span!(parent: &self.span, "shell_request"); - let _entered = span.enter(); - - self.audit_log.push_action(AuditLogAction::ShellRequested); - - session.data(channel, SHELL_PROMPT.to_string().into()); - - session.channel_success(channel); - self.finished(session).boxed().wrap(Span::current()) - } - - fn exec_request( - mut self, - channel: ChannelId, - data: &[u8], - mut session: Session, - ) -> Self::FutureUnit { - let span = info_span!(parent: &self.span, "exec_request"); - let _entered = span.enter(); - - let data = shlex::split(String::from_utf8_lossy(data).as_ref()); - - async move { - if let Some(args) = data { - run_command(&args, channel, &mut session).await; - self.audit_log - .push_action(AuditLogAction::ExecCommand(ExecCommandEvent { - args: Box::from(args), - })); - - session.channel_success(channel); - } else { - session.channel_failure(channel); - } - - self.finished(session).await - } - .boxed() - .wrap(Span::current()) - } - - fn subsystem_request( - mut self, - channel: ChannelId, - name: &str, - mut session: Session, - ) -> Self::FutureUnit { - let span = info_span!(parent: &self.span, "subsystem_request"); - let _entered = span.enter(); - - self.audit_log - .push_action(AuditLogAction::SubsystemRequest(SubsystemRequestEvent { - name: Box::from(name), - })); - - session.channel_failure(channel); - self.finished(session).boxed().wrap(Span::current()) - } - - fn window_change_request( - mut self, - channel: ChannelId, - col_width: u32, - row_height: u32, - pix_width: u32, - pix_height: u32, - mut session: Session, - ) -> Self::FutureUnit { - let span = info_span!(parent: &self.span, "window_change_request"); - let _entered = span.enter(); - - self.audit_log - .push_action(AuditLogAction::WindowChangeRequest( - WindowChangeRequestEvent { - col_width, - row_height, - pix_width, - pix_height, - }, - )); - - session.channel_success(channel); - self.finished(session).boxed().wrap(Span::current()) - } - - fn signal( - mut self, - _channel: ChannelId, - signal_name: Sig, - session: Session, - ) -> Self::FutureUnit { - let span = info_span!(parent: &self.span, "signal"); - let _entered = span.enter(); - - self.audit_log - .push_action(AuditLogAction::Signal(SignalEvent { - name: format!("{signal_name:?}").into(), - })); - - self.finished(session).boxed().wrap(Span::current()) - } - - fn tcpip_forward(mut self, address: &str, port: u32, session: Session) -> Self::FutureBool { - let span = info_span!(parent: &self.span, "tcpip_forward"); - let _entered = span.enter(); - - self.audit_log - .push_action(AuditLogAction::TcpIpForward(TcpIpForwardEvent { - address: Box::from(address), - port, - })); - - self.finished_bool(false, session) - .boxed() - .wrap(Span::current()) - } - - fn cancel_tcpip_forward( - mut self, - address: &str, - port: u32, - session: Session, - ) -> Self::FutureBool { - let span = info_span!(parent: &self.span, "cancel_tcpip_forward"); - let _entered = span.enter(); - - self.audit_log - .push_action(AuditLogAction::CancelTcpIpForward(TcpIpForwardEvent { - address: Box::from(address), - port, - })); - - self.finished_bool(false, session) - .boxed() - .wrap(Span::current()) - } -} - -impl Drop for Connection { - fn drop(&mut self) { - let span = info_span!(parent: &self.span, "drop"); - let _entered = span.enter(); - - info!("Connection closed"); - - let _res = self - .server - .audit_send - .send(std::mem::take(&mut self.audit_log)); - } -} - -type HandlerResult = Result::Error>; -type HandlerFuture = ServerFuture< - ::Error, - BoxFuture<'static, HandlerResult<(Connection, T)>>, ->; - -/// Wraps a future, providing logging and instrumentation. This provides a newtype over the future -/// (`ServerFuture`) in order to enforce usage within the `thrussh::server::Handler` impl. -pub trait WrapFuture: Sized { - type Ok; - type Err; - - fn wrap(self, span: Span) -> ServerFuture; -} - -impl>> WrapFuture for F { - type Ok = T; - type Err = anyhow::Error; - - fn wrap(self, span: Span) -> ServerFuture { - ServerFuture( - self.inspect_err(log_err as fn(&anyhow::Error)) - .instrument(span), - ) - } -} - -/// Logs an error from a future result. -fn log_err(e: &anyhow::Error) { - error!("Connection closed due to: {}", e); -} - -/// A wrapped future, providing logging ad instrumentation. -#[allow(clippy::type_complexity)] -pub struct ServerFuture(Instrumented>); - -impl> + Unpin> Future for ServerFuture { - type Output = F::Output; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.0).poll(cx) - } -} diff --git a/src/state.rs b/src/state.rs deleted file mode 100644 index 0ade91a..0000000 --- a/src/state.rs +++ /dev/null @@ -1,22 +0,0 @@ -use parking_lot::RwLock; -use std::collections::HashSet; - -#[derive(Default)] -pub struct State { - /// A list of passwords that have previously been accepted, and will forever be accepted - /// to further attract the bear. - pub previously_accepted_passwords: StoredPasswords, -} - -#[derive(Default)] -pub struct StoredPasswords(RwLock>>); - -impl StoredPasswords { - pub fn seen(&self, password: &str) -> bool { - self.0.read().contains(password) - } - - pub fn store(&self, password: &str) -> bool { - self.0.write().insert(Box::from(password.to_string())) - } -} -- libgit2 1.7.2