Add timescaledb exporter application
Diff
Cargo.lock | 441 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Cargo.toml | 31 ++++++-------------------------
config.toml | 10 ----------
flake.nix | 84 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
pisshoff-server/Cargo.toml | 27 +++++++++++++++++++++++++++
pisshoff-server/config.toml | 10 ++++++++++
pisshoff-timescaledb-exporter/Cargo.toml | 23 +++++++++++++++++++++++
pisshoff-timescaledb-exporter/config.toml | 7 +++++++
pisshoff-types/Cargo.toml | 12 ++++++++++++
src/audit.rs | 233 --------------------------------------------------------------------------------
src/command.rs | 51 ---------------------------------------------------
src/config.rs | 72 ------------------------------------------------------------------------
src/main.rs | 103 --------------------------------------------------------------------------------
src/server.rs | 613 --------------------------------------------------------------------------------
src/state.rs | 22 ----------------------
pisshoff-server/src/audit.rs | 76 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
pisshoff-server/src/command.rs | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++
pisshoff-server/src/config.rs | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
pisshoff-server/src/main.rs | 110 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
pisshoff-server/src/server.rs | 620 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
pisshoff-server/src/state.rs | 22 ++++++++++++++++++++++
pisshoff-timescaledb-exporter/migrations/V1__initial.sql | 30 ++++++++++++++++++++++++++++++
pisshoff-timescaledb-exporter/src/config.rs | 39 +++++++++++++++++++++++++++++++++++++++
pisshoff-timescaledb-exporter/src/main.rs | 156 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
pisshoff-types/src/audit.rs | 162 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
pisshoff-types/src/lib.rs | 4 ++++
26 files changed, 1931 insertions(+), 1150 deletions(-)
@@ -22,6 +22,15 @@
]
[[package]]
name = "aho-corasick"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41"
dependencies = [
"memchr",
]
[[package]]
name = "anstream"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -75,12 +84,29 @@
version = "1.0.71"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8"
[[package]]
name = "async-trait"
version = "0.1.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.20",
]
[[package]]
name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "base64"
version = "0.21.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d"
[[package]]
name = "base64ct"
@@ -232,7 +258,7 @@
"heck",
"proc-macro2",
"quote",
"syn",
"syn 2.0.20",
]
[[package]]
@@ -309,8 +335,44 @@
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308"
[[package]]
name = "deadpool"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "421fe0f90f2ab22016f32a9881be5134fdd71c65298917084b0c7477cbc3856e"
dependencies = [
"async-trait",
"deadpool-runtime",
"num_cpus",
"retain_mut",
"serde",
"tokio",
]
[[package]]
name = "deadpool-postgres"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "836a24a9d49deefe610b8b60c767a7412e9a931d79a89415cd2d2d71630ca8d7"
dependencies = [
"deadpool",
"log",
"serde",
"tokio",
"tokio-postgres",
]
[[package]]
name = "deadpool-runtime"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaa37046cc0f6c3cc6090fbdbf73ef0b8ef4cfcc37f6befc0020f63e8cf121e1"
dependencies = [
"tokio",
]
[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -382,6 +444,12 @@
"cc",
"libc",
]
[[package]]
name = "fallible-iterator"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]]
name = "fastrand"
@@ -400,6 +468,15 @@
dependencies = [
"crc32fast",
"miniz_oxide",
]
[[package]]
name = "form_urlencoded"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652"
dependencies = [
"percent-encoding",
]
[[package]]
@@ -458,7 +535,7 @@
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.20",
]
[[package]]
@@ -547,6 +624,25 @@
dependencies = [
"crypto-mac",
"digest 0.9.0",
]
[[package]]
name = "hmac"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
dependencies = [
"digest 0.10.7",
]
[[package]]
name = "idna"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]]
@@ -668,6 +764,15 @@
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata",
]
[[package]]
name = "md-5"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca"
dependencies = [
"digest 0.10.7",
]
[[package]]
@@ -681,6 +786,15 @@
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "memoffset"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4"
dependencies = [
"autocfg",
]
[[package]]
name = "miniz_oxide"
@@ -700,6 +814,20 @@
"libc",
"wasi",
"windows-sys",
]
[[package]]
name = "nix"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfdda3d196821d6af13126e40375cdf7da646a96114af134d5f417a9a1dc8e1a"
dependencies = [
"bitflags",
"cfg-if",
"libc",
"memoffset",
"pin-utils",
"static_assertions",
]
[[package]]
@@ -812,7 +940,7 @@
dependencies = [
"base64ct",
"crypto-mac",
"hmac",
"hmac 0.11.0",
"password-hash",
"sha2 0.9.9",
]
@@ -824,6 +952,30 @@
checksum = "f0ca0b5a68607598bf3bad68f32227a8164f6254833f84eafaac409cd6746c31"
dependencies = [
"digest 0.10.7",
]
[[package]]
name = "percent-encoding"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]]
name = "phf"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc"
dependencies = [
"phf_shared",
]
[[package]]
name = "phf_shared"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b"
dependencies = [
"siphasher",
]
[[package]]
@@ -839,7 +991,7 @@
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pisshoff"
name = "pisshoff-server"
version = "0.1.0"
dependencies = [
"anyhow",
@@ -847,17 +999,48 @@
"fastrand",
"futures",
"itertools",
"nix",
"parking_lot",
"pisshoff-types",
"serde",
"serde_json",
"shlex",
"thrussh",
"thrussh-keys",
"time",
"tokio",
"toml",
"tracing",
"tracing-subscriber",
"uuid",
]
[[package]]
name = "pisshoff-timescaledb-exporter"
version = "0.1.0"
dependencies = [
"anyhow",
"clap",
"deadpool-postgres",
"futures",
"pisshoff-types",
"refinery",
"serde",
"serde_json",
"tokio",
"tokio-postgres",
"tokio-util",
"toml",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "pisshoff-types"
version = "0.1.0"
dependencies = [
"serde",
"strum",
"time",
"uuid",
]
@@ -866,6 +1049,39 @@
version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964"
[[package]]
name = "postgres-protocol"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78b7fa9f396f51dffd61546fd8573ee20592287996568e6175ceb0f8699ad75d"
dependencies = [
"base64",
"byteorder",
"bytes",
"fallible-iterator",
"hmac 0.12.1",
"md-5",
"memchr",
"rand",
"sha2 0.10.7",
"stringprep",
]
[[package]]
name = "postgres-types"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f028f05971fe20f512bcc679e2c10227e57809a3af86a7606304435bc8896cd6"
dependencies = [
"bytes",
"fallible-iterator",
"postgres-protocol",
"serde",
"serde_json",
"time",
"uuid",
]
[[package]]
name = "ppv-lite86"
@@ -947,7 +1163,52 @@
dependencies = [
"getrandom",
"redox_syscall 0.2.16",
"thiserror",
]
[[package]]
name = "refinery"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb0436d0dd7bd8d4fce1e828751fa79742b08e35f27cfea7546f8a322b5ef24"
dependencies = [
"refinery-core",
"refinery-macros",
]
[[package]]
name = "refinery-core"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19206547cd047e8f4dfa6b20c30d3ecaf24be05841b6aa0aa926a47a3d0662bb"
dependencies = [
"async-trait",
"cfg-if",
"lazy_static",
"log",
"regex",
"serde",
"siphasher",
"thiserror",
"time",
"tokio",
"tokio-postgres",
"toml",
"url",
"walkdir",
]
[[package]]
name = "refinery-macros"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d94d4b9241859ba19eaa5c04c86e782eb3aa0aae2c5868e0cfa90c856e58a174"
dependencies = [
"proc-macro2",
"quote",
"refinery-core",
"regex",
"syn 2.0.20",
]
[[package]]
@@ -956,6 +1217,8 @@
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0ab3ca65655bb1e41f2a8c8cd662eb4fb035e67c3f78da1d61dffe89d07300f"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax 0.7.2",
]
@@ -979,6 +1242,12 @@
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78"
[[package]]
name = "retain_mut"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0"
[[package]]
name = "rustix"
@@ -993,6 +1262,12 @@
"linux-raw-sys",
"windows-sys",
]
[[package]]
name = "rustversion"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f3208ce4d8448b3f3e7d168a73f5e0c43a61e32930de3bceeccedb388b6bf06"
[[package]]
name = "ryu"
@@ -1032,7 +1307,7 @@
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.20",
]
[[package]]
@@ -1102,6 +1377,12 @@
dependencies = [
"libc",
]
[[package]]
name = "siphasher"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]]
name = "slab"
@@ -1126,6 +1407,32 @@
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "socket2"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877"
dependencies = [
"libc",
"windows-sys",
]
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "stringprep"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]]
@@ -1133,12 +1440,45 @@
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "strum"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f"
dependencies = [
"strum_macros",
]
[[package]]
name = "strum_macros"
version = "0.24.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59"
dependencies = [
"heck",
"proc-macro2",
"quote",
"rustversion",
"syn 1.0.109",
]
[[package]]
name = "subtle"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]]
name = "syn"
version = "1.0.109"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "syn"
@@ -1168,7 +1508,7 @@
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.20",
]
[[package]]
@@ -1216,7 +1556,7 @@
"data-encoding",
"dirs",
"futures",
"hmac",
"hmac 0.11.0",
"log",
"md5",
"num-bigint",
@@ -1271,7 +1611,22 @@
checksum = "372950940a5f07bf38dbe211d7283c9e6d7327df53794992d293e534c733d09b"
dependencies = [
"time-core",
]
[[package]]
name = "tinyvec"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50"
dependencies = [
"tinyvec_macros",
]
[[package]]
name = "tinyvec_macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
@@ -1287,7 +1642,7 @@
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"socket2 0.4.9",
"tokio-macros",
"windows-sys",
]
@@ -1300,7 +1655,31 @@
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.20",
]
[[package]]
name = "tokio-postgres"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e89f6234aa8fd43779746012fcf53603cdb91fdd8399aa0de868c2d56b6dde1"
dependencies = [
"async-trait",
"byteorder",
"bytes",
"fallible-iterator",
"futures-channel",
"futures-util",
"log",
"parking_lot",
"percent-encoding",
"phf",
"pin-project-lite",
"postgres-protocol",
"postgres-types",
"socket2 0.5.3",
"tokio",
"tokio-util",
]
[[package]]
@@ -1308,10 +1687,24 @@
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-util"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
"tracing",
]
[[package]]
@@ -1368,7 +1761,7 @@
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.20",
]
[[package]]
@@ -1415,12 +1808,38 @@
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"
[[package]]
name = "unicode-bidi"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460"
[[package]]
name = "unicode-ident"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15811caf2415fb889178633e7724bad2509101cde276048e013b9def5e51fa0"
[[package]]
name = "unicode-normalization"
version = "0.1.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921"
dependencies = [
"tinyvec",
]
[[package]]
name = "url"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50bff7831e19200a85b17131d085c25d7811bc4e186efdaf54bbd132994a88cb"
dependencies = [
"form_urlencoded",
"idna",
"percent-encoding",
]
[[package]]
name = "utf8parse"
@@ -1,28 +1,9 @@
[package]
name = "pisshoff"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0"
clap = { version = "4.3", features = ["derive", "env", "cargo"] }
futures = "0.3"
parking_lot = "0.12"
fastrand = "1.9"
itertools = "0.10"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
shlex = "1.1"
thrussh = "0.34"
thrussh-keys = "0.22"
time = { version = "0.3", features = ["serde", "formatting"] }
tokio = { version = "1.28", features = ["full"] }
toml = "0.7"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.3", features = ["v4", "serde"] }
[workspace]
members = [
"pisshoff-server",
"pisshoff-timescaledb-exporter",
"pisshoff-types"
]
[patch."crates-io"]
thrussh = { git = "ssh://git@github.com/JordanForks/thrussh" }
@@ -1,10 +1,0 @@
listen-address = "127.0.0.1:2233"
access-probability = 0.2
audit-output-file = "audit.jsonl"
@@ -14,33 +14,34 @@
{
formatter = nixpkgs.legacyPackages."${system}".nixpkgs-fmt;
defaultPackage = naersk-lib.buildPackage {
packages.default = naersk-lib.buildPackage {
src = ./.;
nativeBuildInputs = with pkgs; [ pkg-config ];
buildInputs = with pkgs; [ libsodium ];
};
devShell = with pkgs; mkShell {
devShells.default = with pkgs; mkShell {
buildInputs = [ cargo rustc rustfmt pre-commit rustPackages.clippy ];
RUST_SRC_PATH = rustPlatform.rustLibSrc;
};
nixosModules.default = { config, lib, pkgs, ... }:
nixosModules.pisshoff-server = { config, lib, pkgs, ... }:
with lib;
let
cfg = config.services.pisshoff;
cfg = config.services.pisshoff-server;
in
{
options.services.pisshoff = {
enable = mkEnableOption "pisshoff";
options.services.pisshoff-server = {
enable = mkEnableOption "pisshoff-server";
settings = mkOption {
type = (pkgs.formats.toml { }).type;
default = { };
description = "Specify the configuration for pisshoff in Nix";
description = "Specify the configuration for pisshoff-server in Nix";
};
};
config = mkIf cfg.enable {
systemd.services.pisshoff = {
systemd.services.pisshoff-server = {
enable = true;
wantedBy = [ "multi-user.target" ];
after = [ "network-online.target" ];
@@ -51,7 +52,7 @@
in
{
Type = "exec";
ExecStart = "${self.defaultPackage."${system}"}/bin/pisshoff -c \"${conf}\"";
ExecStart = "${self.packages."${system}".default}/bin/pisshoff-server -c \"${conf}\"";
ExecReload = "${pkgs.coreutils}/bin/kill -HUP $MAINPID";
Restart = "on-failure";
@@ -84,7 +85,7 @@
};
};
services.logrotate.settings.pisshoff = {
services.logrotate.settings.pisshoff-server = {
files = "/var/log/pisshoff/audit.log";
rotate = 31;
frequency = "daily";
@@ -93,6 +94,69 @@
missingok = true;
notifempty = true;
postrotate = "systemctl reload pisshoff";
};
};
};
nixosModules.pisshoff-timescaledb-exporter = { config, lib, pkgs, ... }:
with lib;
let
cfg = config.services.pisshoff-timescaledb-exporter;
in
{
options.services.pisshoff-timescaledb-exporter = {
enable = mkEnableOption "pisshoff-timescaledb-exporter";
settings = mkOption {
type = (pkgs.formats.toml { }).type;
default = { };
description = "Specify the configuration for pisshoff-timescaledb-exporter in Nix";
};
};
config = mkIf cfg.enable {
systemd.services.pisshoff-timescaledb-exporter = {
enable = true;
wantedBy = [ "multi-user.target" ];
after = [ "network-online.target" ];
serviceConfig =
let
format = pkgs.formats.toml { };
conf = format.generate "pisshoff.toml" cfg.settings;
in
{
Type = "exec";
ExecStart = "${self.packages."${system}".default}/bin/pisshoff-timescaledb-exporter -c \"${conf}\"";
ExecReload = "${pkgs.coreutils}/bin/kill -HUP $MAINPID";
Restart = "on-failure";
RuntimeDirectory = "pisshoff-timescaledb-exporter";
LogsDirectory = "pisshoff-timescaledb-exporter";
CapabilityBoundingSet = "";
NoNewPrivileges = true;
PrivateDevices = true;
PrivateTmp = true;
PrivateUsers = true;
PrivateMounts = true;
ProtectHome = true;
ProtectClock = true;
ProtectProc = "invisible";
ProcSubset = "pid";
ProtectKernelLogs = true;
ProtectKernelModules = true;
ProtectKernelTunables = true;
ProtectControlGroups = true;
ProtectHostname = true;
ProtectSystem = "strict";
RestrictSUIDSGID = true;
RestrictRealtime = true;
RestrictNamespaces = true;
LockPersonality = true;
RemoveIPC = true;
MemoryDenyWriteExecute = true;
DynamicUser = true;
RestrictAddressFamilies = [ "AF_UNIX" "AF_INET" "AF_INET6" ];
SystemCallFilter = [ "@system-service" "~@privileged" ];
};
};
};
};
@@ -1,0 +1,27 @@
[package]
name = "pisshoff-server"
version = "0.1.0"
edition = "2021"
[dependencies]
pisshoff-types = { path = "../pisshoff-types" }
anyhow = "1.0"
clap = { version = "4.3", features = ["derive", "env", "cargo"] }
futures = "0.3"
parking_lot = "0.12"
fastrand = "1.9"
itertools = "0.10"
nix = { version = "0.26", features = ["hostname"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
shlex = "1.1"
thrussh = "0.34"
thrussh-keys = "0.22"
tokio = { version = "1.28", features = ["full"] }
toml = "0.7"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.3", features = ["v4", "serde"] }
@@ -1,0 +1,10 @@
listen-address = "127.0.0.1:2233"
access-probability = 0.2
audit-output-file = "audit.jsonl"
@@ -1,0 +1,23 @@
[package]
name = "pisshoff-timescaledb-exporter"
version = "0.1.0"
edition = "2021"
[dependencies]
pisshoff-types = { path = "../pisshoff-types" }
anyhow = "1.0"
clap = { version = "4.3", features = ["derive", "env", "cargo"] }
deadpool-postgres = { version = "0.10", features = ["rt_tokio_1", "serde"] }
futures = "0.3"
refinery = { version = "0.8", features = ["tokio-postgres"] }
tokio = { version = "1.28", features = ["full"] }
tokio-util = { version = "0.7", features = ["codec"] }
tokio-postgres = { version = "0.7", features = ["with-time-0_3", "with-uuid-1", "with-serde_json-1"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
toml = "0.7"
@@ -1,0 +1,7 @@
socket-path = "test.sock"
[pg]
user = "postgres"
dbname = "pisshoff"
host = "127.0.0.1"
port = 64601
@@ -1,0 +1,12 @@
[package]
name = "pisshoff-types"
version = "0.1.0"
edition = "2021"
[dependencies]
uuid = "1.3"
time = { version = "0.3", features = ["serde", "formatting", "parsing"] }
serde = { version = "1.0", features = ["derive"] }
strum = { version = "0.24", features = ["derive"] }
@@ -1,233 +1,0 @@
use crate::config::Config;
use serde::Serialize;
use std::{
fmt::{Debug, Formatter},
io::ErrorKind,
net::SocketAddr,
sync::Arc,
time::{Duration, Instant},
};
use time::OffsetDateTime;
use tokio::{
fs::OpenOptions,
io::{AsyncWriteExt, BufWriter},
sync::{oneshot, watch},
task::JoinHandle,
};
use tracing::{debug, info};
use uuid::Uuid;
pub fn start_audit_writer(
config: Arc<Config>,
mut reload: watch::Receiver<()>,
mut shutdown_recv: oneshot::Receiver<()>,
) -> (
tokio::sync::mpsc::UnboundedSender<AuditLog>,
JoinHandle<Result<(), std::io::Error>>,
) {
let (send, mut recv) = tokio::sync::mpsc::unbounded_channel();
let handle = tokio::spawn(async move {
let open_writer = || async {
let file = OpenOptions::default()
.create(true)
.append(true)
.open(&config.audit_output_file)
.await?;
Ok::<_, std::io::Error>(BufWriter::new(file))
};
let mut writer = open_writer().await?;
let mut shutdown = false;
while !shutdown {
tokio::select! {
log = recv.recv() => {
match log {
Some(log) => {
let log = serde_json::to_vec(&log)
.map_err(|e| std::io::Error::new(ErrorKind::Other, e))?;
writer.write_all(&log).await?;
writer.write_all("\n".as_bytes()).await?;
}
None => {
shutdown = true;
}
}
}
_ = &mut shutdown_recv => {
shutdown = true;
}
_ = tokio::time::sleep(Duration::from_secs(5)), if !writer.buffer().is_empty() => {
debug!("Flushing audits to disk");
writer.flush().await?;
}
Ok(()) = reload.changed() => {
info!("Flushing audits to disk");
writer.flush().await?;
info!("Reopening handle to log file");
writer = open_writer().await?;
info!("Successfully re-opened log file");
}
else => break,
}
}
writer.flush().await?;
Ok(())
});
(send, handle)
}
#[derive(Serialize)]
pub struct AuditLog {
pub connection_id: Uuid,
#[serde(with = "time::serde::rfc3339")]
pub ts: OffsetDateTime,
pub peer_address: Option<SocketAddr>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub environment_variables: Vec<(Box<str>, Box<str>)>,
pub events: Vec<AuditLogEvent>,
#[serde(skip)]
pub start: Instant,
}
impl Default for AuditLog {
fn default() -> Self {
Self {
connection_id: Uuid::default(),
ts: OffsetDateTime::now_utc(),
peer_address: None,
environment_variables: vec![],
events: vec![],
start: Instant::now(),
}
}
}
impl Debug for AuditLog {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AuditLog")
.field("connection_id", &self.connection_id)
.field("peer_address", &self.peer_address)
.field("environment_variables", &self.environment_variables)
.field("events", &self.events)
.finish()
}
}
impl AuditLog {
pub fn push_action(&mut self, action: AuditLogAction) {
self.events.push(AuditLogEvent {
start_offset: self.start.elapsed(),
action,
});
}
}
#[derive(Debug, Serialize)]
pub struct AuditLogEvent {
pub start_offset: Duration,
pub action: AuditLogAction,
}
#[derive(Debug, Serialize)]
#[serde(tag = "type", rename_all = "kebab-case")]
pub enum AuditLogAction {
LoginAttempt(LoginAttemptEvent),
PtyRequest(PtyRequestEvent),
X11Request(X11RequestEvent),
OpenX11(OpenX11Event),
OpenDirectTcpIp(OpenDirectTcpIpEvent),
ExecCommand(ExecCommandEvent),
WindowAdjusted(WindowAdjustedEvent),
ShellRequested,
SubsystemRequest(SubsystemRequestEvent),
WindowChangeRequest(WindowChangeRequestEvent),
Signal(SignalEvent),
TcpIpForward(TcpIpForwardEvent),
CancelTcpIpForward(TcpIpForwardEvent),
}
#[derive(Debug, Serialize)]
pub struct ExecCommandEvent {
pub args: Box<[String]>,
}
#[derive(Debug, Serialize)]
pub struct WindowAdjustedEvent {
pub new_size: usize,
}
#[derive(Debug, Serialize)]
pub struct SubsystemRequestEvent {
pub name: Box<str>,
}
#[derive(Debug, Serialize)]
pub struct SignalEvent {
pub name: Box<str>,
}
#[derive(Debug, Serialize)]
#[serde(tag = "credential-type", rename_all = "kebab-case")]
pub enum LoginAttemptEvent {
UsernamePassword {
username: Box<str>,
password: Box<str>,
},
PublicKey {
kind: &'static str,
fingerprint: Box<str>,
},
}
#[derive(Debug, Serialize)]
pub struct PtyRequestEvent {
pub term: Box<str>,
pub col_width: u32,
pub row_height: u32,
pub pix_width: u32,
pub pix_height: u32,
pub modes: Box<[(u8, u32)]>,
}
#[derive(Debug, Serialize)]
pub struct OpenX11Event {
pub originator_address: Box<str>,
pub originator_port: u32,
}
#[derive(Debug, Serialize)]
pub struct X11RequestEvent {
pub single_connection: bool,
pub x11_auth_protocol: Box<str>,
pub x11_auth_cookie: Box<str>,
pub x11_screen_number: u32,
}
#[derive(Debug, Serialize)]
pub struct OpenDirectTcpIpEvent {
pub host_to_connect: Box<str>,
pub port_to_connect: u32,
pub originator_address: Box<str>,
pub originator_port: u32,
}
#[derive(Debug, Serialize)]
pub struct WindowChangeRequestEvent {
pub col_width: u32,
pub row_height: u32,
pub pix_width: u32,
pub pix_height: u32,
}
#[derive(Debug, Serialize)]
pub struct TcpIpForwardEvent {
pub address: Box<str>,
pub port: u32,
}
@@ -1,51 +1,0 @@
use itertools::Itertools;
use std::{f32, str::FromStr, time::Duration};
use thrussh::{server::Session, ChannelId};
pub async fn run_command(args: &[String], channel: ChannelId, session: &mut Session) {
let Some(command) = args.get(0) else {
return;
};
match command.as_str() {
"echo" => {
session.data(
channel,
format!("{}\n", args.iter().skip(1).join(" ")).into(),
);
}
"whoami" => {
session.data(channel, "root\n".to_string().into());
}
"pwd" => {
session.data(channel, "/root\n".to_string().into());
}
"ls" => {
}
"exit" => {
let exit_status = args
.get(1)
.map(String::as_str)
.map_or(Ok(0), u32::from_str)
.unwrap_or(2);
session.exit_status_request(channel, exit_status);
session.close(channel);
}
"sleep" => {
if let Some(Ok(secs)) = args.get(1).map(String::as_str).map(f32::from_str) {
tokio::time::sleep(Duration::from_secs_f32(secs)).await;
}
}
other => {
session.data(
channel,
format!("bash: {other}: command not found\n").into(),
);
}
}
}
@@ -1,72 +1,0 @@
use clap::Parser;
use serde::{de::DeserializeOwned, Deserialize};
use std::path::PathBuf;
use std::sync::Arc;
use std::{io::ErrorKind, net::SocketAddr};
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
pub struct Args {
#[arg(short, long, env, value_parser = load_config::<Config>)]
pub config: Arc<Config>,
#[arg(short, long, action = clap::ArgAction::Count)]
pub verbose: u8,
}
impl Args {
pub fn verbosity(&self) -> &'static str {
match self.verbose {
0 => "info",
1 => "debug,thrussh=info",
2 => "debug",
_ => "trace",
}
}
}
#[derive(Deserialize, Clone)]
#[serde(rename_all = "kebab-case")]
pub struct Config {
#[serde(default = "Config::default_listen_address")]
pub listen_address: SocketAddr,
#[serde(default = "Config::default_access_probability")]
pub access_probability: f64,
#[serde(default = "Config::default_audit_output_file")]
pub audit_output_file: PathBuf,
#[serde(default = "Config::default_server_id")]
pub server_id: String,
}
impl Config {
fn default_listen_address() -> SocketAddr {
"0.0.0.0:22".parse().unwrap()
}
fn default_access_probability() -> f64 {
0.2
}
fn default_audit_output_file() -> PathBuf {
"/var/log/pisshoff/audit.log".parse().unwrap()
}
fn default_server_id() -> String {
"SSH-2.0-OpenSSH_9.3".to_string()
}
}
fn load_config<T: DeserializeOwned>(path: &str) -> Result<Arc<T>, std::io::Error> {
let file = std::fs::read_to_string(path)?;
toml::from_str(&file)
.map(Arc::new)
.map_err(|e| std::io::Error::new(ErrorKind::Other, e))
}
@@ -1,103 +1,0 @@
#![deny(clippy::pedantic)]
#![allow(clippy::module_name_repetitions)]
use crate::{config::Args, server::Server};
use clap::Parser;
use futures::FutureExt;
use std::sync::Arc;
use thrussh::MethodSet;
use tokio::{
signal::unix::SignalKind,
sync::{oneshot, watch},
};
use tracing::{error, info};
use tracing_subscriber::EnvFilter;
mod audit;
mod command;
mod config;
mod server;
mod state;
#[tokio::main]
async fn main() {
if let Err(e) = run().await {
error!("Failed to run {}: {}", env!("CARGO_CRATE_NAME"), e);
std::process::exit(1);
}
}
async fn run() -> anyhow::Result<()> {
let args = Args::parse();
std::env::set_var("RUST_LOG", args.verbosity());
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
info!(
"{} listening on {}",
env!("CARGO_CRATE_NAME"),
args.config.listen_address
);
let keys = vec![thrussh_keys::key::KeyPair::generate_ed25519().unwrap()];
let thrussh_config = Arc::new(thrussh::server::Config {
server_id: args.config.server_id.to_string(),
methods: MethodSet::PASSWORD | MethodSet::PUBLICKEY | MethodSet::KEYBOARD_INTERACTIVE,
keys,
auth_rejection_time: std::time::Duration::from_secs(1),
..thrussh::server::Config::default()
});
let (reload_send, reload_recv) = watch::channel(());
let (shutdown_send, shutdown_recv) = oneshot::channel();
let (audit_send, audit_handle) =
audit::start_audit_writer(args.config.clone(), reload_recv, shutdown_recv);
let mut audit_handle = audit_handle.fuse();
let server = Server::new(args.config.clone(), audit_send);
let listen_address = args.config.listen_address.to_string();
let fut = thrussh::server::run(thrussh_config, &listen_address, server);
let shutdown_watcher = watch_for_shutdown(shutdown_send);
let reload_watcher = watch_for_reloads(reload_send);
tokio::select! {
res = fut => res?,
res = &mut audit_handle => res??,
res = shutdown_watcher => res?,
res = reload_watcher => res?,
}
info!("Finishing audit log writes");
audit_handle.await??;
info!("Audit log writes finished");
Ok(())
}
async fn watch_for_shutdown(send: oneshot::Sender<()>) -> Result<(), anyhow::Error> {
tokio::signal::ctrl_c().await?;
info!("Received ctrl-c, initiating shutdown");
let _res = send.send(());
Ok(())
}
async fn watch_for_reloads(send: watch::Sender<()>) -> Result<(), anyhow::Error> {
let mut signal = tokio::signal::unix::signal(SignalKind::hangup())?;
while let Some(()) = signal.recv().await {
info!("Received SIGHUP, broadcasting reload");
let _res = send.send(());
}
Ok(())
}
@@ -1,613 +1,0 @@
use crate::audit::{
ExecCommandEvent, SignalEvent, SubsystemRequestEvent, TcpIpForwardEvent, WindowAdjustedEvent,
WindowChangeRequestEvent,
};
use crate::{
audit::{
AuditLog, AuditLogAction, LoginAttemptEvent, OpenDirectTcpIpEvent, OpenX11Event,
PtyRequestEvent, X11RequestEvent,
},
command::run_command,
config::Config,
state::State,
};
use futures::{
future::{BoxFuture, InspectErr},
FutureExt, TryFutureExt,
};
use std::{
borrow::Cow,
future::Future,
net::SocketAddr,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use thrussh::{
server::{Auth, Response, Session},
ChannelId, Pty, Sig,
};
use thrussh_keys::key::PublicKey;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, error, info, info_span, instrument::Instrumented, Instrument, Span};
pub static KEYBOARD_INTERACTIVE_PROMPT: &[(Cow<'static, str>, bool)] =
&[(Cow::Borrowed("Password: "), false)];
pub const SHELL_PROMPT: &str = "bash-5.1$ ";
#[derive(Clone)]
pub struct Server {
config: Arc<Config>,
state: Arc<State>,
audit_send: UnboundedSender<AuditLog>,
}
impl Server {
pub fn new(config: Arc<Config>, audit_send: UnboundedSender<AuditLog>) -> Self {
Self {
config,
state: Arc::new(State::default()),
audit_send,
}
}
}
impl thrussh::server::Server for Server {
type Handler = Connection;
fn new(&mut self, peer_addr: Option<SocketAddr>) -> Self::Handler {
let connection_id = uuid::Uuid::new_v4();
Connection {
span: info_span!("connection", ?peer_addr, %connection_id),
server: self.clone(),
audit_log: AuditLog {
connection_id,
peer_address: peer_addr,
..AuditLog::default()
},
}
}
}
pub struct Connection {
span: Span,
server: Server,
audit_log: AuditLog,
}
impl Connection {
fn try_login(&mut self, user: &str, password: &str) -> bool {
let res = if self
.server
.state
.previously_accepted_passwords
.seen(password)
{
info!(user, password, "Accepted login due to it being used before");
true
} else if fastrand::f64() <= self.server.config.access_probability {
info!(user, password, "Accepted login randomly");
self.server
.state
.previously_accepted_passwords
.store(password);
true
} else {
info!(?user, ?password, "Rejected login");
false
};
self.audit_log.push_action(AuditLogAction::LoginAttempt(
LoginAttemptEvent::UsernamePassword {
username: Box::from(user),
password: Box::from(password),
},
));
res
}
}
impl thrussh::server::Handler for Connection {
type Error = anyhow::Error;
type FutureAuth = HandlerFuture<Auth>;
type FutureUnit = HandlerFuture<Session>;
type FutureBool =
ServerFuture<Self::Error, BoxFuture<'static, Result<(Self, Session, bool), Self::Error>>>;
fn finished_auth(self, auth: Auth) -> Self::FutureAuth {
let span = info_span!(parent: &self.span, "finished_auth");
futures::future::ok((self, auth)).boxed().wrap(span)
}
fn finished_bool(self, b: bool, session: Session) -> Self::FutureBool {
let span = info_span!(parent: &self.span, "finished_bool");
let _entered = span.enter();
futures::future::ok((self, session, b))
.boxed()
.wrap(Span::current())
}
fn finished(self, session: Session) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "finished");
let _entered = span.enter();
futures::future::ok((self, session))
.boxed()
.wrap(Span::current())
}
fn auth_none(self, _user: &str) -> Self::FutureAuth {
let span = info_span!(parent: &self.span, "auth_none");
self.finished_auth(Auth::UnsupportedMethod)
.boxed()
.wrap(span)
}
fn auth_password(mut self, user: &str, password: &str) -> Self::FutureAuth {
let span = info_span!(parent: &self.span, "auth_password");
let _entered = span.enter();
let res = if self.try_login(user, password) {
Auth::Accept
} else {
Auth::Reject
};
self.finished_auth(res)
}
fn auth_publickey(mut self, _user: &str, public_key: &PublicKey) -> Self::FutureAuth {
let span = info_span!(parent: &self.span, "auth_publickey");
let _entered = span.enter();
let kind = public_key.name();
let fingerprint = public_key.fingerprint();
self.audit_log
.push_action(AuditLogAction::LoginAttempt(LoginAttemptEvent::PublicKey {
kind,
fingerprint: Box::from(fingerprint),
}));
self.finished_auth(Auth::Reject)
.boxed()
.wrap(Span::current())
}
fn auth_keyboard_interactive(
mut self,
user: &str,
_submethods: &str,
mut response: Option<Response>,
) -> Self::FutureAuth {
let span = info_span!(parent: &self.span, "auth_keyboard_interactive");
let _entered = span.enter();
let result = if let Some(password) = response
.as_mut()
.and_then(Response::next)
.map(String::from_utf8_lossy)
{
if self.try_login(user, password.as_ref()) {
Auth::Accept
} else {
Auth::Reject
}
} else {
debug!("Client is attempting keyboard-interactive, obliging");
Auth::Partial {
name: "".into(),
instructions: "".into(),
prompts: KEYBOARD_INTERACTIVE_PROMPT.into(),
}
};
self.finished_auth(result)
}
fn channel_close(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "channel_close");
let _entered = span.enter();
session.channel_success(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn channel_eof(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "channel_eof");
let _entered = span.enter();
info!("In here");
session.channel_success(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn channel_open_session(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "channel_open_session");
let _entered = span.enter();
session.channel_success(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn channel_open_x11(
mut self,
channel: ChannelId,
originator_address: &str,
originator_port: u32,
mut session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "channel_open_x11");
let _entered = span.enter();
self.audit_log
.push_action(AuditLogAction::OpenX11(OpenX11Event {
originator_address: Box::from(originator_address),
originator_port,
}));
session.channel_failure(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn channel_open_direct_tcpip(
mut self,
channel: ChannelId,
host_to_connect: &str,
port_to_connect: u32,
originator_address: &str,
originator_port: u32,
mut session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "channel_open_direct_tcpip");
let _entered = span.enter();
self.audit_log
.push_action(AuditLogAction::OpenDirectTcpIp(OpenDirectTcpIpEvent {
host_to_connect: Box::from(host_to_connect),
port_to_connect,
originator_address: Box::from(originator_address),
originator_port,
}));
session.channel_failure(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn data(mut self, channel: ChannelId, data: &[u8], mut session: Session) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "data");
let _entered = span.enter();
let data = shlex::split(String::from_utf8_lossy(data).as_ref());
async move {
if let Some(args) = data {
run_command(&args, channel, &mut session).await;
self.audit_log
.push_action(AuditLogAction::ExecCommand(ExecCommandEvent {
args: Box::from(args),
}));
}
session.data(channel, SHELL_PROMPT.to_string().into());
self.finished(session).await
}
.boxed()
.wrap(Span::current())
}
fn extended_data(
self,
_channel: ChannelId,
_code: u32,
_data: &[u8],
session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "extended_data");
let _entered = span.enter();
self.finished(session).boxed().wrap(Span::current())
}
fn window_adjusted(
mut self,
_channel: ChannelId,
new_window_size: usize,
session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "window_adjusted");
let _entered = span.enter();
self.audit_log
.push_action(AuditLogAction::WindowAdjusted(WindowAdjustedEvent {
new_size: new_window_size,
}));
self.finished(session).boxed().wrap(Span::current())
}
fn adjust_window(&mut self, _channel: ChannelId, current: u32) -> u32 {
let span = info_span!(parent: &self.span, "adjust_window");
let _entered = span.enter();
current
}
fn pty_request(
mut self,
channel: ChannelId,
term: &str,
col_width: u32,
row_height: u32,
pix_width: u32,
pix_height: u32,
modes: &[(Pty, u32)],
mut session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "pty_request");
let _entered = span.enter();
self.audit_log
.push_action(AuditLogAction::PtyRequest(PtyRequestEvent {
term: Box::from(term),
col_width,
row_height,
pix_width,
pix_height,
modes: Box::from(
modes
.iter()
.copied()
.map(|(pty, val)| (pty as u8, val))
.collect::<Vec<_>>(),
),
}));
session.channel_failure(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn x11_request(
mut self,
channel: ChannelId,
single_connection: bool,
x11_auth_protocol: &str,
x11_auth_cookie: &str,
x11_screen_number: u32,
mut session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "x11_request");
let _entered = span.enter();
self.audit_log
.push_action(AuditLogAction::X11Request(X11RequestEvent {
single_connection,
x11_auth_protocol: Box::from(x11_auth_protocol),
x11_auth_cookie: Box::from(x11_auth_cookie),
x11_screen_number,
}));
session.channel_failure(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn env_request(
mut self,
channel: ChannelId,
variable_name: &str,
variable_value: &str,
mut session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "env_request");
let _entered = span.enter();
self.audit_log
.environment_variables
.push((Box::from(variable_name), Box::from(variable_value)));
session.channel_success(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn shell_request(mut self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "shell_request");
let _entered = span.enter();
self.audit_log.push_action(AuditLogAction::ShellRequested);
session.data(channel, SHELL_PROMPT.to_string().into());
session.channel_success(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn exec_request(
mut self,
channel: ChannelId,
data: &[u8],
mut session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "exec_request");
let _entered = span.enter();
let data = shlex::split(String::from_utf8_lossy(data).as_ref());
async move {
if let Some(args) = data {
run_command(&args, channel, &mut session).await;
self.audit_log
.push_action(AuditLogAction::ExecCommand(ExecCommandEvent {
args: Box::from(args),
}));
session.channel_success(channel);
} else {
session.channel_failure(channel);
}
self.finished(session).await
}
.boxed()
.wrap(Span::current())
}
fn subsystem_request(
mut self,
channel: ChannelId,
name: &str,
mut session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "subsystem_request");
let _entered = span.enter();
self.audit_log
.push_action(AuditLogAction::SubsystemRequest(SubsystemRequestEvent {
name: Box::from(name),
}));
session.channel_failure(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn window_change_request(
mut self,
channel: ChannelId,
col_width: u32,
row_height: u32,
pix_width: u32,
pix_height: u32,
mut session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "window_change_request");
let _entered = span.enter();
self.audit_log
.push_action(AuditLogAction::WindowChangeRequest(
WindowChangeRequestEvent {
col_width,
row_height,
pix_width,
pix_height,
},
));
session.channel_success(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn signal(
mut self,
_channel: ChannelId,
signal_name: Sig,
session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "signal");
let _entered = span.enter();
self.audit_log
.push_action(AuditLogAction::Signal(SignalEvent {
name: format!("{signal_name:?}").into(),
}));
self.finished(session).boxed().wrap(Span::current())
}
fn tcpip_forward(mut self, address: &str, port: u32, session: Session) -> Self::FutureBool {
let span = info_span!(parent: &self.span, "tcpip_forward");
let _entered = span.enter();
self.audit_log
.push_action(AuditLogAction::TcpIpForward(TcpIpForwardEvent {
address: Box::from(address),
port,
}));
self.finished_bool(false, session)
.boxed()
.wrap(Span::current())
}
fn cancel_tcpip_forward(
mut self,
address: &str,
port: u32,
session: Session,
) -> Self::FutureBool {
let span = info_span!(parent: &self.span, "cancel_tcpip_forward");
let _entered = span.enter();
self.audit_log
.push_action(AuditLogAction::CancelTcpIpForward(TcpIpForwardEvent {
address: Box::from(address),
port,
}));
self.finished_bool(false, session)
.boxed()
.wrap(Span::current())
}
}
impl Drop for Connection {
fn drop(&mut self) {
let span = info_span!(parent: &self.span, "drop");
let _entered = span.enter();
info!("Connection closed");
let _res = self
.server
.audit_send
.send(std::mem::take(&mut self.audit_log));
}
}
type HandlerResult<T> = Result<T, <Connection as thrussh::server::Handler>::Error>;
type HandlerFuture<T> = ServerFuture<
<Connection as thrussh::server::Handler>::Error,
BoxFuture<'static, HandlerResult<(Connection, T)>>,
>;
pub trait WrapFuture: Sized {
type Ok;
type Err;
fn wrap(self, span: Span) -> ServerFuture<Self::Err, Self>;
}
impl<T, F: Future<Output = Result<T, anyhow::Error>>> WrapFuture for F {
type Ok = T;
type Err = anyhow::Error;
fn wrap(self, span: Span) -> ServerFuture<Self::Err, Self> {
ServerFuture(
self.inspect_err(log_err as fn(&anyhow::Error))
.instrument(span),
)
}
}
fn log_err(e: &anyhow::Error) {
error!("Connection closed due to: {}", e);
}
#[allow(clippy::type_complexity)]
pub struct ServerFuture<E, F>(Instrumented<InspectErr<F, fn(&E)>>);
impl<T, E, F: Future<Output = Result<T, E>> + Unpin> Future for ServerFuture<E, F> {
type Output = F::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx)
}
}
@@ -1,22 +1,0 @@
use parking_lot::RwLock;
use std::collections::HashSet;
#[derive(Default)]
pub struct State {
pub previously_accepted_passwords: StoredPasswords,
}
#[derive(Default)]
pub struct StoredPasswords(RwLock<HashSet<Box<str>>>);
impl StoredPasswords {
pub fn seen(&self, password: &str) -> bool {
self.0.read().contains(password)
}
pub fn store(&self, password: &str) -> bool {
self.0.write().insert(Box::from(password.to_string()))
}
}
@@ -1,0 +1,76 @@
use crate::config::Config;
pub use pisshoff_types::audit::*;
use std::{io::ErrorKind, sync::Arc, time::Duration};
use tokio::{
fs::OpenOptions,
io::{AsyncWriteExt, BufWriter},
sync::{oneshot, watch},
task::JoinHandle,
};
use tracing::{debug, info};
pub fn start_audit_writer(
config: Arc<Config>,
mut reload: watch::Receiver<()>,
mut shutdown_recv: oneshot::Receiver<()>,
) -> (
tokio::sync::mpsc::UnboundedSender<AuditLog>,
JoinHandle<Result<(), std::io::Error>>,
) {
let (send, mut recv) = tokio::sync::mpsc::unbounded_channel();
let handle = tokio::spawn(async move {
let open_writer = || async {
let file = OpenOptions::default()
.create(true)
.append(true)
.open(&config.audit_output_file)
.await?;
Ok::<_, std::io::Error>(BufWriter::new(file))
};
let mut writer = open_writer().await?;
let mut shutdown = false;
while !shutdown {
tokio::select! {
log = recv.recv() => {
match log {
Some(log) => {
let log = serde_json::to_vec(&log)
.map_err(|e| std::io::Error::new(ErrorKind::Other, e))?;
writer.write_all(&log).await?;
writer.write_all("\n".as_bytes()).await?;
}
None => {
shutdown = true;
}
}
}
_ = &mut shutdown_recv => {
shutdown = true;
}
_ = tokio::time::sleep(Duration::from_secs(5)), if !writer.buffer().is_empty() => {
debug!("Flushing audits to disk");
writer.flush().await?;
}
Ok(()) = reload.changed() => {
info!("Flushing audits to disk");
writer.flush().await?;
info!("Reopening handle to log file");
writer = open_writer().await?;
info!("Successfully re-opened log file");
}
else => break,
}
}
writer.flush().await?;
Ok(())
});
(send, handle)
}
@@ -1,0 +1,51 @@
use itertools::Itertools;
use std::{f32, str::FromStr, time::Duration};
use thrussh::{server::Session, ChannelId};
pub async fn run_command(args: &[String], channel: ChannelId, session: &mut Session) {
let Some(command) = args.get(0) else {
return;
};
match command.as_str() {
"echo" => {
session.data(
channel,
format!("{}\n", args.iter().skip(1).join(" ")).into(),
);
}
"whoami" => {
session.data(channel, "root\n".to_string().into());
}
"pwd" => {
session.data(channel, "/root\n".to_string().into());
}
"ls" => {
}
"exit" => {
let exit_status = args
.get(1)
.map(String::as_str)
.map_or(Ok(0), u32::from_str)
.unwrap_or(2);
session.exit_status_request(channel, exit_status);
session.close(channel);
}
"sleep" => {
if let Some(Ok(secs)) = args.get(1).map(String::as_str).map(f32::from_str) {
tokio::time::sleep(Duration::from_secs_f32(secs)).await;
}
}
other => {
session.data(
channel,
format!("bash: {other}: command not found\n").into(),
);
}
}
}
@@ -1,0 +1,72 @@
use clap::Parser;
use serde::{de::DeserializeOwned, Deserialize};
use std::path::PathBuf;
use std::sync::Arc;
use std::{io::ErrorKind, net::SocketAddr};
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
pub struct Args {
#[arg(short, long, env, value_parser = load_config::<Config>)]
pub config: Arc<Config>,
#[arg(short, long, action = clap::ArgAction::Count)]
pub verbose: u8,
}
impl Args {
pub fn verbosity(&self) -> &'static str {
match self.verbose {
0 => "info",
1 => "debug,thrussh=info",
2 => "debug",
_ => "trace",
}
}
}
#[derive(Deserialize, Clone)]
#[serde(rename_all = "kebab-case")]
pub struct Config {
#[serde(default = "Config::default_listen_address")]
pub listen_address: SocketAddr,
#[serde(default = "Config::default_access_probability")]
pub access_probability: f64,
#[serde(default = "Config::default_audit_output_file")]
pub audit_output_file: PathBuf,
#[serde(default = "Config::default_server_id")]
pub server_id: String,
}
impl Config {
fn default_listen_address() -> SocketAddr {
"0.0.0.0:22".parse().unwrap()
}
fn default_access_probability() -> f64 {
0.2
}
fn default_audit_output_file() -> PathBuf {
"/var/log/pisshoff/audit.log".parse().unwrap()
}
fn default_server_id() -> String {
"SSH-2.0-OpenSSH_9.3".to_string()
}
}
fn load_config<T: DeserializeOwned>(path: &str) -> Result<Arc<T>, std::io::Error> {
let file = std::fs::read_to_string(path)?;
toml::from_str(&file)
.map(Arc::new)
.map_err(|e| std::io::Error::new(ErrorKind::Other, e))
}
@@ -1,0 +1,110 @@
#![deny(clippy::pedantic)]
#![allow(clippy::module_name_repetitions)]
use crate::{config::Args, server::Server};
use anyhow::anyhow;
use clap::Parser;
use futures::FutureExt;
use std::sync::Arc;
use thrussh::MethodSet;
use tokio::{
signal::unix::SignalKind,
sync::{oneshot, watch},
};
use tracing::{error, info};
use tracing_subscriber::EnvFilter;
mod audit;
mod command;
mod config;
mod server;
mod state;
#[tokio::main]
async fn main() {
if let Err(e) = run().await {
error!("Failed to run {}: {}", env!("CARGO_CRATE_NAME"), e);
std::process::exit(1);
}
}
async fn run() -> anyhow::Result<()> {
let args = Args::parse();
std::env::set_var("RUST_LOG", args.verbosity());
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
info!(
"{} listening on {}",
env!("CARGO_CRATE_NAME"),
args.config.listen_address
);
let hostname = Box::leak(
nix::unistd::gethostname()?
.into_string()
.map_err(|_| anyhow!("invalid hostname"))?
.into_boxed_str(),
);
let keys = vec![thrussh_keys::key::KeyPair::generate_ed25519().unwrap()];
let thrussh_config = Arc::new(thrussh::server::Config {
server_id: args.config.server_id.to_string(),
methods: MethodSet::PASSWORD | MethodSet::PUBLICKEY | MethodSet::KEYBOARD_INTERACTIVE,
keys,
auth_rejection_time: std::time::Duration::from_secs(1),
..thrussh::server::Config::default()
});
let (reload_send, reload_recv) = watch::channel(());
let (shutdown_send, shutdown_recv) = oneshot::channel();
let (audit_send, audit_handle) =
audit::start_audit_writer(args.config.clone(), reload_recv, shutdown_recv);
let mut audit_handle = audit_handle.fuse();
let server = Server::new(hostname, args.config.clone(), audit_send);
let listen_address = args.config.listen_address.to_string();
let fut = thrussh::server::run(thrussh_config, &listen_address, server);
let shutdown_watcher = watch_for_shutdown(shutdown_send);
let reload_watcher = watch_for_reloads(reload_send);
tokio::select! {
res = fut => res?,
res = &mut audit_handle => res??,
res = shutdown_watcher => res?,
res = reload_watcher => res?,
}
info!("Finishing audit log writes");
audit_handle.await??;
info!("Audit log writes finished");
Ok(())
}
async fn watch_for_shutdown(send: oneshot::Sender<()>) -> Result<(), anyhow::Error> {
tokio::signal::ctrl_c().await?;
info!("Received ctrl-c, initiating shutdown");
let _res = send.send(());
Ok(())
}
async fn watch_for_reloads(send: watch::Sender<()>) -> Result<(), anyhow::Error> {
let mut signal = tokio::signal::unix::signal(SignalKind::hangup())?;
while let Some(()) = signal.recv().await {
info!("Received SIGHUP, broadcasting reload");
let _res = send.send(());
}
Ok(())
}
@@ -1,0 +1,620 @@
use crate::audit::{
ExecCommandEvent, SignalEvent, SubsystemRequestEvent, TcpIpForwardEvent, WindowAdjustedEvent,
WindowChangeRequestEvent,
};
use crate::{
audit::{
AuditLog, AuditLogAction, LoginAttemptEvent, OpenDirectTcpIpEvent, OpenX11Event,
PtyRequestEvent, X11RequestEvent,
},
command::run_command,
config::Config,
state::State,
};
use futures::{
future::{BoxFuture, InspectErr},
FutureExt, TryFutureExt,
};
use std::{
borrow::Cow,
future::Future,
net::SocketAddr,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use thrussh::{
server::{Auth, Response, Session},
ChannelId, Pty, Sig,
};
use thrussh_keys::key::PublicKey;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, error, info, info_span, instrument::Instrumented, Instrument, Span};
pub static KEYBOARD_INTERACTIVE_PROMPT: &[(Cow<'static, str>, bool)] =
&[(Cow::Borrowed("Password: "), false)];
pub const SHELL_PROMPT: &str = "bash-5.1$ ";
#[derive(Clone)]
pub struct Server {
config: Arc<Config>,
state: Arc<State>,
hostname: &'static str,
audit_send: UnboundedSender<AuditLog>,
}
impl Server {
pub fn new(
hostname: &'static str,
config: Arc<Config>,
audit_send: UnboundedSender<AuditLog>,
) -> Self {
Self {
config,
hostname,
state: Arc::new(State::default()),
audit_send,
}
}
}
impl thrussh::server::Server for Server {
type Handler = Connection;
fn new(&mut self, peer_addr: Option<SocketAddr>) -> Self::Handler {
let connection_id = uuid::Uuid::new_v4();
Connection {
span: info_span!("connection", ?peer_addr, %connection_id),
server: self.clone(),
audit_log: AuditLog {
connection_id,
host: Cow::Borrowed(self.hostname),
peer_address: peer_addr,
..AuditLog::default()
},
}
}
}
pub struct Connection {
span: Span,
server: Server,
audit_log: AuditLog,
}
impl Connection {
fn try_login(&mut self, user: &str, password: &str) -> bool {
let res = if self
.server
.state
.previously_accepted_passwords
.seen(password)
{
info!(user, password, "Accepted login due to it being used before");
true
} else if fastrand::f64() <= self.server.config.access_probability {
info!(user, password, "Accepted login randomly");
self.server
.state
.previously_accepted_passwords
.store(password);
true
} else {
info!(?user, ?password, "Rejected login");
false
};
self.audit_log.push_action(AuditLogAction::LoginAttempt(
LoginAttemptEvent::UsernamePassword {
username: Box::from(user),
password: Box::from(password),
},
));
res
}
}
impl thrussh::server::Handler for Connection {
type Error = anyhow::Error;
type FutureAuth = HandlerFuture<Auth>;
type FutureUnit = HandlerFuture<Session>;
type FutureBool =
ServerFuture<Self::Error, BoxFuture<'static, Result<(Self, Session, bool), Self::Error>>>;
fn finished_auth(self, auth: Auth) -> Self::FutureAuth {
let span = info_span!(parent: &self.span, "finished_auth");
futures::future::ok((self, auth)).boxed().wrap(span)
}
fn finished_bool(self, b: bool, session: Session) -> Self::FutureBool {
let span = info_span!(parent: &self.span, "finished_bool");
let _entered = span.enter();
futures::future::ok((self, session, b))
.boxed()
.wrap(Span::current())
}
fn finished(self, session: Session) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "finished");
let _entered = span.enter();
futures::future::ok((self, session))
.boxed()
.wrap(Span::current())
}
fn auth_none(self, _user: &str) -> Self::FutureAuth {
let span = info_span!(parent: &self.span, "auth_none");
self.finished_auth(Auth::UnsupportedMethod)
.boxed()
.wrap(span)
}
fn auth_password(mut self, user: &str, password: &str) -> Self::FutureAuth {
let span = info_span!(parent: &self.span, "auth_password");
let _entered = span.enter();
let res = if self.try_login(user, password) {
Auth::Accept
} else {
Auth::Reject
};
self.finished_auth(res)
}
fn auth_publickey(mut self, _user: &str, public_key: &PublicKey) -> Self::FutureAuth {
let span = info_span!(parent: &self.span, "auth_publickey");
let _entered = span.enter();
let kind = public_key.name();
let fingerprint = public_key.fingerprint();
self.audit_log
.push_action(AuditLogAction::LoginAttempt(LoginAttemptEvent::PublicKey {
kind: Cow::Borrowed(kind),
fingerprint: Box::from(fingerprint),
}));
self.finished_auth(Auth::Reject)
.boxed()
.wrap(Span::current())
}
fn auth_keyboard_interactive(
mut self,
user: &str,
_submethods: &str,
mut response: Option<Response>,
) -> Self::FutureAuth {
let span = info_span!(parent: &self.span, "auth_keyboard_interactive");
let _entered = span.enter();
let result = if let Some(password) = response
.as_mut()
.and_then(Response::next)
.map(String::from_utf8_lossy)
{
if self.try_login(user, password.as_ref()) {
Auth::Accept
} else {
Auth::Reject
}
} else {
debug!("Client is attempting keyboard-interactive, obliging");
Auth::Partial {
name: "".into(),
instructions: "".into(),
prompts: KEYBOARD_INTERACTIVE_PROMPT.into(),
}
};
self.finished_auth(result)
}
fn channel_close(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "channel_close");
let _entered = span.enter();
session.channel_success(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn channel_eof(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "channel_eof");
let _entered = span.enter();
info!("In here");
session.channel_success(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn channel_open_session(self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "channel_open_session");
let _entered = span.enter();
session.channel_success(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn channel_open_x11(
mut self,
channel: ChannelId,
originator_address: &str,
originator_port: u32,
mut session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "channel_open_x11");
let _entered = span.enter();
self.audit_log
.push_action(AuditLogAction::OpenX11(OpenX11Event {
originator_address: Box::from(originator_address),
originator_port,
}));
session.channel_failure(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn channel_open_direct_tcpip(
mut self,
channel: ChannelId,
host_to_connect: &str,
port_to_connect: u32,
originator_address: &str,
originator_port: u32,
mut session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "channel_open_direct_tcpip");
let _entered = span.enter();
self.audit_log
.push_action(AuditLogAction::OpenDirectTcpIp(OpenDirectTcpIpEvent {
host_to_connect: Box::from(host_to_connect),
port_to_connect,
originator_address: Box::from(originator_address),
originator_port,
}));
session.channel_failure(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn data(mut self, channel: ChannelId, data: &[u8], mut session: Session) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "data");
let _entered = span.enter();
let data = shlex::split(String::from_utf8_lossy(data).as_ref());
async move {
if let Some(args) = data {
run_command(&args, channel, &mut session).await;
self.audit_log
.push_action(AuditLogAction::ExecCommand(ExecCommandEvent {
args: Box::from(args),
}));
}
session.data(channel, SHELL_PROMPT.to_string().into());
self.finished(session).await
}
.boxed()
.wrap(Span::current())
}
fn extended_data(
self,
_channel: ChannelId,
_code: u32,
_data: &[u8],
session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "extended_data");
let _entered = span.enter();
self.finished(session).boxed().wrap(Span::current())
}
fn window_adjusted(
mut self,
_channel: ChannelId,
new_window_size: usize,
session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "window_adjusted");
let _entered = span.enter();
self.audit_log
.push_action(AuditLogAction::WindowAdjusted(WindowAdjustedEvent {
new_size: new_window_size,
}));
self.finished(session).boxed().wrap(Span::current())
}
fn adjust_window(&mut self, _channel: ChannelId, current: u32) -> u32 {
let span = info_span!(parent: &self.span, "adjust_window");
let _entered = span.enter();
current
}
fn pty_request(
mut self,
channel: ChannelId,
term: &str,
col_width: u32,
row_height: u32,
pix_width: u32,
pix_height: u32,
modes: &[(Pty, u32)],
mut session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "pty_request");
let _entered = span.enter();
self.audit_log
.push_action(AuditLogAction::PtyRequest(PtyRequestEvent {
term: Box::from(term),
col_width,
row_height,
pix_width,
pix_height,
modes: Box::from(
modes
.iter()
.copied()
.map(|(pty, val)| (pty as u8, val))
.collect::<Vec<_>>(),
),
}));
session.channel_failure(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn x11_request(
mut self,
channel: ChannelId,
single_connection: bool,
x11_auth_protocol: &str,
x11_auth_cookie: &str,
x11_screen_number: u32,
mut session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "x11_request");
let _entered = span.enter();
self.audit_log
.push_action(AuditLogAction::X11Request(X11RequestEvent {
single_connection,
x11_auth_protocol: Box::from(x11_auth_protocol),
x11_auth_cookie: Box::from(x11_auth_cookie),
x11_screen_number,
}));
session.channel_failure(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn env_request(
mut self,
channel: ChannelId,
variable_name: &str,
variable_value: &str,
mut session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "env_request");
let _entered = span.enter();
self.audit_log
.environment_variables
.push((Box::from(variable_name), Box::from(variable_value)));
session.channel_success(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn shell_request(mut self, channel: ChannelId, mut session: Session) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "shell_request");
let _entered = span.enter();
self.audit_log.push_action(AuditLogAction::ShellRequested);
session.data(channel, SHELL_PROMPT.to_string().into());
session.channel_success(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn exec_request(
mut self,
channel: ChannelId,
data: &[u8],
mut session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "exec_request");
let _entered = span.enter();
let data = shlex::split(String::from_utf8_lossy(data).as_ref());
async move {
if let Some(args) = data {
run_command(&args, channel, &mut session).await;
self.audit_log
.push_action(AuditLogAction::ExecCommand(ExecCommandEvent {
args: Box::from(args),
}));
session.channel_success(channel);
} else {
session.channel_failure(channel);
}
self.finished(session).await
}
.boxed()
.wrap(Span::current())
}
fn subsystem_request(
mut self,
channel: ChannelId,
name: &str,
mut session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "subsystem_request");
let _entered = span.enter();
self.audit_log
.push_action(AuditLogAction::SubsystemRequest(SubsystemRequestEvent {
name: Box::from(name),
}));
session.channel_failure(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn window_change_request(
mut self,
channel: ChannelId,
col_width: u32,
row_height: u32,
pix_width: u32,
pix_height: u32,
mut session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "window_change_request");
let _entered = span.enter();
self.audit_log
.push_action(AuditLogAction::WindowChangeRequest(
WindowChangeRequestEvent {
col_width,
row_height,
pix_width,
pix_height,
},
));
session.channel_success(channel);
self.finished(session).boxed().wrap(Span::current())
}
fn signal(
mut self,
_channel: ChannelId,
signal_name: Sig,
session: Session,
) -> Self::FutureUnit {
let span = info_span!(parent: &self.span, "signal");
let _entered = span.enter();
self.audit_log
.push_action(AuditLogAction::Signal(SignalEvent {
name: format!("{signal_name:?}").into(),
}));
self.finished(session).boxed().wrap(Span::current())
}
fn tcpip_forward(mut self, address: &str, port: u32, session: Session) -> Self::FutureBool {
let span = info_span!(parent: &self.span, "tcpip_forward");
let _entered = span.enter();
self.audit_log
.push_action(AuditLogAction::TcpIpForward(TcpIpForwardEvent {
address: Box::from(address),
port,
}));
self.finished_bool(false, session)
.boxed()
.wrap(Span::current())
}
fn cancel_tcpip_forward(
mut self,
address: &str,
port: u32,
session: Session,
) -> Self::FutureBool {
let span = info_span!(parent: &self.span, "cancel_tcpip_forward");
let _entered = span.enter();
self.audit_log
.push_action(AuditLogAction::CancelTcpIpForward(TcpIpForwardEvent {
address: Box::from(address),
port,
}));
self.finished_bool(false, session)
.boxed()
.wrap(Span::current())
}
}
impl Drop for Connection {
fn drop(&mut self) {
let span = info_span!(parent: &self.span, "drop");
let _entered = span.enter();
info!("Connection closed");
let _res = self
.server
.audit_send
.send(std::mem::take(&mut self.audit_log));
}
}
type HandlerResult<T> = Result<T, <Connection as thrussh::server::Handler>::Error>;
type HandlerFuture<T> = ServerFuture<
<Connection as thrussh::server::Handler>::Error,
BoxFuture<'static, HandlerResult<(Connection, T)>>,
>;
pub trait WrapFuture: Sized {
type Ok;
type Err;
fn wrap(self, span: Span) -> ServerFuture<Self::Err, Self>;
}
impl<T, F: Future<Output = Result<T, anyhow::Error>>> WrapFuture for F {
type Ok = T;
type Err = anyhow::Error;
fn wrap(self, span: Span) -> ServerFuture<Self::Err, Self> {
ServerFuture(
self.inspect_err(log_err as fn(&anyhow::Error))
.instrument(span),
)
}
}
fn log_err(e: &anyhow::Error) {
error!("Connection closed due to: {}", e);
}
#[allow(clippy::type_complexity)]
pub struct ServerFuture<E, F>(Instrumented<InspectErr<F, fn(&E)>>);
impl<T, E, F: Future<Output = Result<T, E>> + Unpin> Future for ServerFuture<E, F> {
type Output = F::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx)
}
}
@@ -1,0 +1,22 @@
use parking_lot::RwLock;
use std::collections::HashSet;
#[derive(Default)]
pub struct State {
pub previously_accepted_passwords: StoredPasswords,
}
#[derive(Default)]
pub struct StoredPasswords(RwLock<HashSet<Box<str>>>);
impl StoredPasswords {
pub fn seen(&self, password: &str) -> bool {
self.0.read().contains(password)
}
pub fn store(&self, password: &str) -> bool {
self.0.write().insert(Box::from(password.to_string()))
}
}
@@ -1,0 +1,30 @@
CREATE TABLE audit (
timestamp TIMESTAMPTZ NOT NULL,
connection_id UUID NOT NULL,
peer_address TEXT NOT NULL,
host TEXT NOT NULL,
UNIQUE(timestamp)
);
SELECT create_hypertable('audit', 'timestamp');
CREATE TABLE audit_environment_variables (
connection_id uuid NOT NULL,
name TEXT NOT NULL,
value TEXT NOT NULL
);
CREATE INDEX audit_environment_variables_connection_id ON audit_environment_variables USING HASH (connection_id);
CREATE INDEX audit_environment_variables_name ON audit_environment_variables USING HASH (name);
CREATE TABLE audit_events (
timestamp TIMESTAMPTZ NOT NULL,
connection_id UUID NOT NULL,
type TEXT NOT NULL,
content JSONB
);
SELECT create_hypertable('audit_events', 'timestamp');
CREATE INDEX audit_events_connection_id ON audit_events USING HASH (connection_id);
CREATE INDEX audit_events_type ON audit_events USING HASH (type);
@@ -1,0 +1,39 @@
use clap::Parser;
use serde::{de::DeserializeOwned, Deserialize};
use std::{io::ErrorKind, path::PathBuf, sync::Arc};
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
pub struct Args {
#[arg(short, long, env, value_parser = load_config::<Config>)]
pub config: Arc<Config>,
#[arg(short, long, action = clap::ArgAction::Count)]
pub verbose: u8,
}
impl Args {
pub fn verbosity(&self) -> &'static str {
match self.verbose {
0 => "info",
1 => "debug,thrussh=info",
2 => "debug",
_ => "trace",
}
}
}
#[derive(Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct Config {
pub socket_path: PathBuf,
pub pg: deadpool_postgres::Config,
}
fn load_config<T: DeserializeOwned>(path: &str) -> Result<Arc<T>, std::io::Error> {
let file = std::fs::read_to_string(path)?;
toml::from_str(&file)
.map(Arc::new)
.map_err(|e| std::io::Error::new(ErrorKind::Other, e))
}
@@ -1,0 +1,156 @@
#![deny(clippy::pedantic)]
#![allow(clippy::module_name_repetitions)]
use crate::config::Args;
use clap::Parser;
use deadpool_postgres::{
tokio_postgres::{NoTls, Statement, Transaction},
GenericClient, Runtime,
};
use futures::{StreamExt, TryFutureExt};
use pisshoff_types::audit::{AuditLog, AuditLogEvent};
use std::sync::Arc;
use tokio::net::{UnixListener, UnixStream};
use tokio_util::codec::{Decoder, LinesCodec};
use tracing::{error, info};
use tracing_subscriber::EnvFilter;
mod config;
mod embedded {
use refinery::embed_migrations;
embed_migrations!();
}
pub struct Context {
db: deadpool_postgres::Pool,
}
#[tokio::main]
async fn main() {
if let Err(e) = run().await {
error!("Failed to run {}: {}", env!("CARGO_CRATE_NAME"), e);
std::process::exit(1);
}
}
async fn run() -> anyhow::Result<()> {
let args = Args::parse();
std::env::set_var("RUST_LOG", args.verbosity());
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.init();
let db = args.config.pg.create_pool(Some(Runtime::Tokio1), NoTls)?;
let context = Arc::new(Context { db });
embedded::migrations::runner()
.run_async(&mut **context.db.get().await?)
.await?;
spawn_listener(&args, context).await
}
async fn spawn_listener(args: &Args, context: Arc<Context>) -> anyhow::Result<()> {
let listener = UnixListener::bind(&args.config.socket_path)?;
loop {
let (stream, remote) = listener.accept().await?;
info!(?remote, "Accepted incoming connection");
let context = context.clone();
tokio::spawn(async move {
if let Err(e) = handle_connection(stream, context).await {
error!("Connection failed: {e}");
}
});
}
}
async fn handle_connection(stream: UnixStream, context: Arc<Context>) -> anyhow::Result<()> {
let mut framed = LinesCodec::new().framed(stream);
while let Some(line) = framed.next().await.transpose()? {
let context = context.clone();
tokio::spawn(
ingest_log(context, line).inspect_err(|e| error!("Failed to ingest log: {e}")),
);
}
Ok(())
}
async fn ingest_log(context: Arc<Context>, line: String) -> anyhow::Result<()> {
let line: AuditLog = serde_json::from_str(&line)?;
let Some(peer_address) = line.peer_address else {
return Ok(());
};
let mut connection = context.db.get().await?;
let tx = connection.transaction().await?;
tokio::try_join!(
async {
tx
.execute(
"INSERT INTO audit (timestamp, connection_id, peer_address, host) VALUES ($1, $2, $3, $4)",
&[&line.ts, &line.connection_id, &peer_address.to_string(), &line.host],
)
.await
.map_err(anyhow::Error::from)
},
async {
let prepared = tx.prepare("INSERT INTO audit_environment_variables (connection_id, name, value) VALUES ($1, $2, $3)").await?;
futures::future::try_join_all(
line.environment_variables
.iter()
.map(|(key, value)| async { tx.execute(&prepared, &[key, value]).await }),
)
.await
.map_err(anyhow::Error::from)
},
async {
let prepared = tx.prepare("INSERT INTO audit_events (timestamp, connection_id, type, content) VALUES ($1, $2, $3, $4)").await?;
futures::future::try_join_all(
line.events
.iter()
.map(|event| insert_event(&tx, &prepared, &line, event)),
)
.await
}
)?;
tx.commit().await?;
Ok(())
}
async fn insert_event(
tx: &Transaction<'_>,
prepared: &Statement,
line: &AuditLog,
event: &AuditLogEvent,
) -> anyhow::Result<()> {
let ts = line.ts + event.start_offset;
tx.execute(
prepared,
&[
&ts,
&line.connection_id,
&<&'static str>::from(&event.action),
&serde_json::to_value(&event.action)?,
],
)
.await?;
Ok(())
}
@@ -1,0 +1,162 @@
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::{
fmt::{Debug, Formatter},
net::SocketAddr,
time::{Duration, Instant},
};
use strum::IntoStaticStr;
use time::OffsetDateTime;
use uuid::Uuid;
#[derive(Serialize, Deserialize)]
pub struct AuditLog {
pub connection_id: Uuid,
#[serde(with = "time::serde::rfc3339")]
pub ts: OffsetDateTime,
pub peer_address: Option<SocketAddr>,
pub host: Cow<'static, str>,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub environment_variables: Vec<(Box<str>, Box<str>)>,
pub events: Vec<AuditLogEvent>,
#[serde(skip, default = "Instant::now")]
pub start: Instant,
}
impl Default for AuditLog {
fn default() -> Self {
Self {
connection_id: Uuid::default(),
ts: OffsetDateTime::now_utc(),
host: Cow::Borrowed(""),
peer_address: None,
environment_variables: vec![],
events: vec![],
start: Instant::now(),
}
}
}
impl Debug for AuditLog {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AuditLog")
.field("connection_id", &self.connection_id)
.field("peer_address", &self.peer_address)
.field("environment_variables", &self.environment_variables)
.field("events", &self.events)
.finish()
}
}
impl AuditLog {
pub fn push_action(&mut self, action: AuditLogAction) {
self.events.push(AuditLogEvent {
start_offset: self.start.elapsed(),
action,
});
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct AuditLogEvent {
pub start_offset: Duration,
pub action: AuditLogAction,
}
#[derive(Debug, Serialize, Deserialize, IntoStaticStr)]
#[serde(tag = "type", rename_all = "kebab-case")]
#[strum(serialize_all = "kebab-case")]
pub enum AuditLogAction {
LoginAttempt(LoginAttemptEvent),
PtyRequest(PtyRequestEvent),
X11Request(X11RequestEvent),
OpenX11(OpenX11Event),
OpenDirectTcpIp(OpenDirectTcpIpEvent),
ExecCommand(ExecCommandEvent),
WindowAdjusted(WindowAdjustedEvent),
ShellRequested,
SubsystemRequest(SubsystemRequestEvent),
WindowChangeRequest(WindowChangeRequestEvent),
Signal(SignalEvent),
TcpIpForward(TcpIpForwardEvent),
CancelTcpIpForward(TcpIpForwardEvent),
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ExecCommandEvent {
pub args: Box<[String]>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct WindowAdjustedEvent {
pub new_size: usize,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SubsystemRequestEvent {
pub name: Box<str>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SignalEvent {
pub name: Box<str>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "credential-type", rename_all = "kebab-case")]
pub enum LoginAttemptEvent {
UsernamePassword {
username: Box<str>,
password: Box<str>,
},
PublicKey {
kind: Cow<'static, str>,
fingerprint: Box<str>,
},
}
#[derive(Debug, Serialize, Deserialize)]
pub struct PtyRequestEvent {
pub term: Box<str>,
pub col_width: u32,
pub row_height: u32,
pub pix_width: u32,
pub pix_height: u32,
pub modes: Box<[(u8, u32)]>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct OpenX11Event {
pub originator_address: Box<str>,
pub originator_port: u32,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct X11RequestEvent {
pub single_connection: bool,
pub x11_auth_protocol: Box<str>,
pub x11_auth_cookie: Box<str>,
pub x11_screen_number: u32,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct OpenDirectTcpIpEvent {
pub host_to_connect: Box<str>,
pub port_to_connect: u32,
pub originator_address: Box<str>,
pub originator_port: u32,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct WindowChangeRequestEvent {
pub col_width: u32,
pub row_height: u32,
pub pix_width: u32,
pub pix_height: u32,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TcpIpForwardEvent {
pub address: Box<str>,
pub port: u32,
}
@@ -1,0 +1,4 @@
#![deny(clippy::pedantic)]
#![allow(clippy::module_name_repetitions)]
pub mod audit;