From f656ad1f8bb9904c6b8f1612c515f4fbad726780 Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Mon, 18 Sep 2023 01:37:42 +0100 Subject: [PATCH] Implement session endpoint --- Cargo.lock | 1 + jogre-server/Cargo.toml | 4 +++- jogre-server/config.toml | 1 + jmap-proto/src/common.rs | 10 ++++++++-- jogre-server/src/config.rs | 88 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ jogre-server/src/context.rs | 9 ++++++++- jogre-server/src/main.rs | 20 +++++++++++++++++--- jogre-server/src/store.rs | 105 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ jmap-proto/src/endpoints/session.rs | 123 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------- jogre-server/src/layers/auth_required.rs | 2 ++ jogre-server/src/methods/session.rs | 120 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ jogre-server/src/store/rocksdb.rs | 159 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 12 files changed, 567 insertions(+), 75 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1ceefd8..94fdf4a 100644 --- a/Cargo.lock +++ a/Cargo.lock @@ -922,6 +922,7 @@ "futures", "hex", "hmac", + "jmap-proto", "oxide-auth", "oxide-auth-axum", "rand", diff --git a/jogre-server/Cargo.toml b/jogre-server/Cargo.toml index a2bd799..2875260 100644 --- a/jogre-server/Cargo.toml +++ a/jogre-server/Cargo.toml @@ -6,6 +6,8 @@ # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +jmap-proto = { path = "../jmap-proto" } + argon2 = "0.5" askama = "0.12" axum = "0.6" @@ -25,7 +27,7 @@ toml = "0.8" tracing = "0.1" tracing-subscriber = "0.3" -url = "2.4" +url = { version = "2.4", features = ["serde"] } uuid = { version = "1.4", features = ["v4", "serde"] } serde = { version = "1.0.188", features = ["derive"] } sha3 = "0.10" diff --git a/jogre-server/config.toml b/jogre-server/config.toml index 127ee6a..7c59369 100644 --- a/jogre-server/config.toml +++ a/jogre-server/config.toml @@ -1,4 +1,5 @@ private-key = "mycoolatleast32byteprivatekey" +base-url = "http://127.0.0.1:8888" [store] type = "rocksdb" diff --git a/jmap-proto/src/common.rs b/jmap-proto/src/common.rs index 8f58053..d2db628 100644 --- a/jmap-proto/src/common.rs +++ a/jmap-proto/src/common.rs @@ -14,6 +14,12 @@ #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)] pub struct UnsignedInt(u64); +impl From for UnsignedInt { + fn from(value: u64) -> Self { + Self(value) + } +} + /// All record ids are assigned by the server and are immutable. /// /// Where "Id" is given as a data type, it means a "String" of at least 1 @@ -40,7 +46,7 @@ /// A good solution to these issues is to prefix every id with a single /// alphabetical character. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)] -pub struct Id<'a>(#[serde(borrow)] Cow<'a, str>); +pub struct Id<'a>(#[serde(borrow)] pub Cow<'a, str>); /// Where "Date" is given as a type, it means a string in "date-time" /// format [RFC3339]. To ensure a normalised form, the "time-secfrac" @@ -64,4 +70,4 @@ /// has changed (e.g., an account has been added or removed), so they /// need to refetch the object. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct SessionState<'a>(#[serde(borrow)] Cow<'a, str>); +pub struct SessionState<'a>(#[serde(borrow)] pub Cow<'a, str>); diff --git a/jogre-server/src/config.rs b/jogre-server/src/config.rs index 0e29908..c9974e9 100644 --- a/jogre-server/src/config.rs +++ a/jogre-server/src/config.rs @@ -17,4 +17,92 @@ /// path = "db" /// ``` pub store: StoreConfig, + /// Capabilities of the server as advertised to the client, and enforced + /// at the server. + #[serde(default)] + pub core_capabilities: CoreCapabilities, + /// Base URL of the server + pub base_url: url::Url, +} + +#[derive(Deserialize, Copy, Clone, Debug)] +#[serde(rename_all = "kebab-case")] +pub struct CoreCapabilities { + /// The maximum file size, in octets, that the server will accept + /// for a single file upload (for any purpose). Suggested minimum: + /// 50,000,000. + #[serde(default = "CoreCapabilities::default_max_size_upload")] + pub max_size_upload: u64, + /// The maximum number of concurrent requests the server will + /// accept to the upload endpoint. Suggested minimum: 4. + #[serde(default = "CoreCapabilities::default_max_concurrent_upload")] + pub max_concurrent_upload: u64, + /// The maximum size, in octets, that the server will accept for a + /// single request to the API endpoint. Suggested minimum: + /// 10,000,000. + #[serde(default = "CoreCapabilities::default_max_size_request")] + pub max_size_request: u64, + /// The maximum number of concurrent requests the server will + /// accept to the API endpoint. Suggested minimum: 4. + #[serde(default = "CoreCapabilities::default_max_concurrent_requests")] + pub max_concurrent_requests: u64, + /// The maximum number of method calls the server will accept in a + /// single request to the API endpoint. Suggested minimum: 16. + #[serde(default = "CoreCapabilities::default_max_calls_in_request")] + pub max_calls_in_request: u64, + /// The maximum number of objects that the client may request in a + /// single /get type method call. Suggested minimum: 500. + #[serde(default = "CoreCapabilities::default_max_objects_in_get")] + pub max_objects_in_get: u64, + /// The maximum number of objects the client may send to create, + /// update, or destroy in a single /set type method call. This is + /// the combined total, e.g., if the maximum is 10, you could not + /// create 7 objects and destroy 6, as this would be 13 actions, + /// which exceeds the limit. Suggested minimum: 500. + #[serde(default = "CoreCapabilities::default_max_objects_in_set")] + pub max_objects_in_set: u64, +} + +impl Default for CoreCapabilities { + fn default() -> Self { + Self { + max_size_upload: Self::default_max_size_upload(), + max_concurrent_upload: Self::default_max_concurrent_upload(), + max_size_request: Self::default_max_size_request(), + max_concurrent_requests: Self::default_max_concurrent_requests(), + max_calls_in_request: Self::default_max_calls_in_request(), + max_objects_in_get: Self::default_max_objects_in_get(), + max_objects_in_set: Self::default_max_objects_in_set(), + } + } +} + +impl CoreCapabilities { + const fn default_max_size_upload() -> u64 { + 50_000_000 + } + + const fn default_max_concurrent_upload() -> u64 { + 4 + } + + const fn default_max_size_request() -> u64 { + 10_000_000 + } + + const fn default_max_concurrent_requests() -> u64 { + 4 + } + + const fn default_max_calls_in_request() -> u64 { + 16 + } + + const fn default_max_objects_in_get() -> u64 { + 500 + } + + const fn default_max_objects_in_set() -> u64 { + 500 + } } diff --git a/jogre-server/src/context.rs b/jogre-server/src/context.rs index 70ffcb2..bbf431a 100644 --- a/jogre-server/src/context.rs +++ a/jogre-server/src/context.rs @@ -1,12 +1,17 @@ use std::sync::Arc; -use crate::{config::Config, store::Store}; +use crate::{ + config::{Config, CoreCapabilities}, + store::Store, +}; pub mod oauth2; pub struct Context { pub oauth2: oauth2::OAuth2, pub store: Arc, + pub base_url: url::Url, + pub core_capabilities: CoreCapabilities, } impl Context { @@ -17,6 +22,8 @@ Self { oauth2: oauth2::OAuth2::new(store.clone(), derived_keys), store, + base_url: config.base_url, + core_capabilities: config.core_capabilities, } } } diff --git a/jogre-server/src/main.rs b/jogre-server/src/main.rs index 70dbb0c..d6968bf 100644 --- a/jogre-server/src/main.rs +++ a/jogre-server/src/main.rs @@ -11,7 +11,10 @@ use rand::RngCore; use tracing::info; -use crate::{context::Context, store::UserProvider}; +use crate::{ + context::Context, + store::{AccountAccessLevel, AccountProvider, UserProvider}, +}; #[derive(Parser, Debug)] #[clap(author, version, about)] @@ -54,6 +57,17 @@ info!("User root created with password {password}"); - let root = store::User::new("root".into(), &password); - context.store.create_user(root).await.unwrap(); + let root_user = store::User::new("root".into(), &password); + let root_user_id = root_user.id; + context.store.create_user(root_user).await.unwrap(); + + let root_account = store::Account::new("root".into(), true, false); + let root_account_id = root_account.id; + context.store.create_account(root_account).await.unwrap(); + + context + .store + .attach_account_to_user(root_account_id, root_user_id, AccountAccessLevel::Owner) + .await + .unwrap(); } diff --git a/jogre-server/src/store.rs b/jogre-server/src/store.rs index 38f1af6..d01ec8b 100644 --- a/jogre-server/src/store.rs +++ a/jogre-server/src/store.rs @@ -6,14 +6,20 @@ use serde::{Deserialize, Serialize}; use uuid::Uuid; +/// A user corresponds to an actual end user that can login to the service, +/// objects aren't directly stored under users though - users are granted +/// access to a set of accounts that objects are stored under. +/// +/// Each user automatically has a "personal" account created for them. #[derive(Serialize, Deserialize)] pub struct User { - id: Uuid, + pub id: Uuid, pub username: String, password: String, } impl User { + /// Builds a new `User` with the given username and password. pub fn new(username: String, password: &str) -> Self { let password = Argon2::default() .hash_password(password.as_bytes(), &SaltString::generate(&mut OsRng)) @@ -27,6 +33,7 @@ } } + /// Verifies if the given password is valid for the user. pub fn verify_password(&self, password: &str) -> bool { let parsed_hash = PasswordHash::new(&self.password).unwrap(); Argon2::default() @@ -39,13 +46,65 @@ pub trait UserProvider { type Error; + async fn increment_seq_number_for_user(&self, user: Uuid) -> Result<(), Self::Error>; + + async fn fetch_seq_number_for_user(&self, user: Uuid) -> Result; + async fn has_any_users(&self) -> Result; async fn create_user(&self, user: User) -> Result<(), Self::Error>; async fn get_by_username(&self, username: &str) -> Result, Self::Error>; +} + +/// An entity which contains many objects, these can be shared among users. +#[derive(Serialize, Deserialize, Debug)] +pub struct Account { + /// ID of the account + pub id: Uuid, + /// A user-friendly name for the account. + pub name: String, + /// Whether or not the account is a user's primary account. + pub is_personal: bool, + /// Whether or not the entire account is read-only. + pub is_read_only: bool, +} + +impl Account { + pub fn new(name: String, is_personal: bool, is_read_only: bool) -> Self { + Self { + id: Uuid::new_v4(), + name, + is_personal, + is_read_only, + } + } } +#[async_trait] +pub trait AccountProvider { + type Error; + + /// Creates or updates an account in the data store. + async fn create_account(&self, account: Account) -> Result<(), Self::Error>; + + /// Grants a user access to an account. + async fn attach_account_to_user( + &self, + account: Uuid, + user: Uuid, + access: AccountAccessLevel, + ) -> Result<(), Self::Error>; + + /// Fetches a list of accounts for the given user. + async fn get_accounts_for_user(&self, user_id: Uuid) -> Result, Self::Error>; +} + +#[repr(u8)] +pub enum AccountAccessLevel { + Owner, +} + #[derive(Deserialize)] #[serde(tag = "type")] pub enum StoreConfig { @@ -61,6 +120,34 @@ pub fn from_config(config: StoreConfig) -> Self { match config { StoreConfig::RocksDb(config) => Self::RocksDb(rocksdb::RocksDb::new(config)), + } + } +} + +#[async_trait] +impl AccountProvider for Store { + type Error = rocksdb::Error; + + async fn create_account(&self, account: Account) -> Result<(), Self::Error> { + match self { + Store::RocksDb(db) => db.create_account(account).await, + } + } + + async fn attach_account_to_user( + &self, + account: Uuid, + user: Uuid, + access: AccountAccessLevel, + ) -> Result<(), Self::Error> { + match self { + Store::RocksDb(db) => db.attach_account_to_user(account, user, access).await, + } + } + + async fn get_accounts_for_user(&self, user_id: Uuid) -> Result, Self::Error> { + match self { + Store::RocksDb(db) => db.get_accounts_for_user(user_id).await, } } } @@ -68,19 +155,35 @@ #[async_trait] impl UserProvider for Store { type Error = rocksdb::Error; + + async fn increment_seq_number_for_user(&self, user: Uuid) -> Result<(), Self::Error> { + match self { + Store::RocksDb(db) => db.increment_seq_number_for_user(user).await, + } + } + + async fn fetch_seq_number_for_user(&self, user: Uuid) -> Result { + match self { + Store::RocksDb(db) => db.fetch_seq_number_for_user(user).await, + } + } + /// Checks if any users have been registered to decide whether a root + /// account should be created at boot. async fn has_any_users(&self) -> Result { match self { Store::RocksDb(db) => db.has_any_users().await, } } + /// Creates or updates a user in the store. async fn create_user(&self, user: User) -> Result<(), Self::Error> { match self { Store::RocksDb(db) => db.create_user(user).await, } } + /// Fetches a user by their username. async fn get_by_username(&self, username: &str) -> Result, Self::Error> { match self { Store::RocksDb(db) => db.get_by_username(username).await, diff --git a/jmap-proto/src/endpoints/session.rs b/jmap-proto/src/endpoints/session.rs index 82f346f..01d3725 100644 --- a/jmap-proto/src/endpoints/session.rs +++ a/jmap-proto/src/endpoints/session.rs @@ -16,6 +16,7 @@ /// no-store, must-revalidate" on the response. /// /// Exposed from https://${hostname}[:${port}]/.well-known/jmap +#[serde_as] #[derive(Deserialize, Serialize, Clone, Debug)] #[serde(rename_all = "camelCase")] pub struct Session<'a> { @@ -24,11 +25,57 @@ /// each of these keys is an object with further information about the /// server's capabilities in relation to that capability. #[serde(borrow)] - capabilities: ServerCapabilities<'a>, + pub capabilities: ServerCapabilities<'a>, /// A map of an account id to an Account object for each account (see /// Section 1.6.2) the user has access to. + #[serde(borrow)] + pub accounts: HashMap, Account<'a>>, + /// A map of capability URIs (as found in accountCapabilities) to the + /// account id that is considered to be the user's main or default + /// account for data pertaining to that capability. If no account + /// being returned belongs to the user, or in any other way there is + /// no appropriate way to determine a default account, there MAY be no + /// entry for a particular URI, even though that capability is + /// supported by the server (and in the capabilities object). + /// "urn:ietf:params:jmap:core" SHOULD NOT be present. + #[serde_as(as = "HashMap")] + pub primary_accounts: HashMap, Id<'a>>, + /// The username associated with the given credentials, or the empty + /// string if none. + #[serde(borrow)] + pub username: Cow<'a, str>, + /// The URL to use for JMAP API requests. + #[serde(borrow)] + pub api_url: Cow<'a, str>, + /// The URL endpoint to use when downloading files, in URI Template + /// (level 1) format [RFC6570]. The URL MUST contain variables called + /// "accountId", "blobId", "type", and "name". The use of these + /// variables is described in Section 6.2. Due to potential encoding + /// issues with slashes in content types, it is RECOMMENDED to put the + /// "type" variable in the query section of the URL. + #[serde(borrow)] + pub download_url: Cow<'a, str>, + /// The URL endpoint to use when uploading files, in URI Template + /// (level 1) format [RFC6570]. The URL MUST contain a variable + /// called "accountId". The use of this variable is described in + /// Section 6.1. + #[serde(borrow)] + pub upload_url: Cow<'a, str>, + /// The URL to connect to for push events, as described in + /// Section 7.3, in URI Template (level 1) format [RFC6570]. The URL + /// MUST contain variables called "types", "closeafter", and "ping". + /// The use of these variables is described in Section 7.3. + #[serde(borrow)] + pub event_source_url: Cow<'a, str>, + /// A (preferably short) string representing the state of this object + /// on the server. If the value of any other property on the Session + /// object changes, this string will change. The current value is + /// also returned on the API Response object (see Section 3.4), + /// allowing clients to quickly determine if the session information + /// has changed (e.g., an account has been added or removed), so they + /// need to refetch the object. #[serde(borrow)] - accounts: HashMap, Account<'a>>, + pub state: SessionState<'a>, } #[derive(Deserialize, Serialize, Clone, Debug)] @@ -36,7 +83,7 @@ /// The capabilities object MUST include a property called /// "urn:ietf:params:jmap:core". #[serde(rename = "urn:ietf:params:jmap:core", borrow)] - core: CoreCapability<'a>, + pub core: CoreCapability<'a>, } #[serde_as] @@ -46,34 +93,34 @@ /// The maximum file size, in octets, that the server will accept /// for a single file upload (for any purpose). Suggested minimum: /// 50,000,000. - max_size_upload: UnsignedInt, + pub max_size_upload: UnsignedInt, /// The maximum number of concurrent requests the server will /// accept to the upload endpoint. Suggested minimum: 4. - max_concurrent_upload: UnsignedInt, + pub max_concurrent_upload: UnsignedInt, /// The maximum size, in octets, that the server will accept for a /// single request to the API endpoint. Suggested minimum: /// 10,000,000. - max_size_request: UnsignedInt, + pub max_size_request: UnsignedInt, /// The maximum number of concurrent requests the server will /// accept to the API endpoint. Suggested minimum: 4. - max_concurrent_requests: UnsignedInt, + pub max_concurrent_requests: UnsignedInt, /// The maximum number of method calls the server will accept in a /// single request to the API endpoint. Suggested minimum: 16. - max_calls_in_request: UnsignedInt, + pub max_calls_in_request: UnsignedInt, /// The maximum number of objects that the client may request in a /// single /get type method call. Suggested minimum: 500. - max_objects_in_get: UnsignedInt, + pub max_objects_in_get: UnsignedInt, /// The maximum number of objects the client may send to create, /// update, or destroy in a single /set type method call. This is /// the combined total, e.g., if the maximum is 10, you could not /// create 7 objects and destroy 6, as this would be 13 actions, /// which exceeds the limit. Suggested minimum: 500. - max_objects_in_set: UnsignedInt, + pub max_objects_in_set: UnsignedInt, /// A list of identifiers for algorithms registered in the /// collation registry, as defined in [RFC4790], that the server /// supports for sorting when querying records. #[serde_as(as = "BTreeSet")] - collation_algorithms: BTreeSet>, + pub collation_algorithms: BTreeSet>, } #[serde_as] @@ -84,66 +131,20 @@ /// this account, e.g., the email address representing the owner of /// the account. #[serde(borrow)] - name: Cow<'a, str>, + pub name: Cow<'a, str>, /// This is true if the account belongs to the authenticated user /// rather than a group account or a personal account of another /// user that has been shared with them. - is_personal: bool, + pub is_personal: bool, /// This is true if the entire account is read-only. - is_read_only: bool, + pub is_read_only: bool, /// The set of capability URIs for the methods supported in this /// account. Each key is a URI for a capability that has methods /// you can use with this account. The value for each of these /// keys is an object with further information about the account's /// permissions and restrictions with respect to this capability, /// as defined in the capability's specification. - account_capabilities: AccountCapabilities, - /// A map of capability URIs (as found in accountCapabilities) to the - /// account id that is considered to be the user's main or default - /// account for data pertaining to that capability. If no account - /// being returned belongs to the user, or in any other way there is - /// no appropriate way to determine a default account, there MAY be no - /// entry for a particular URI, even though that capability is - /// supported by the server (and in the capabilities object). - /// "urn:ietf:params:jmap:core" SHOULD NOT be present. - #[serde_as(as = "HashMap")] - primary_accounts: HashMap, Id<'a>>, - /// The username associated with the given credentials, or the empty - /// string if none. - #[serde(borrow)] - username: Cow<'a, str>, - /// The URL to use for JMAP API requests. - #[serde(borrow)] - api_url: Cow<'a, str>, - /// The URL endpoint to use when downloading files, in URI Template - /// (level 1) format [RFC6570]. The URL MUST contain variables called - /// "accountId", "blobId", "type", and "name". The use of these - /// variables is described in Section 6.2. Due to potential encoding - /// issues with slashes in content types, it is RECOMMENDED to put the - /// "type" variable in the query section of the URL. - #[serde(borrow)] - download_url: Cow<'a, str>, - /// The URL endpoint to use when uploading files, in URI Template - /// (level 1) format [RFC6570]. The URL MUST contain a variable - /// called "accountId". The use of this variable is described in - /// Section 6.1. - #[serde(borrow)] - upload_url: Cow<'a, str>, - /// The URL to connect to for push events, as described in - /// Section 7.3, in URI Template (level 1) format [RFC6570]. The URL - /// MUST contain variables called "types", "closeafter", and "ping". - /// The use of these variables is described in Section 7.3. - #[serde(borrow)] - event_source_url: Cow<'a, str>, - /// A (preferably short) string representing the state of this object - /// on the server. If the value of any other property on the Session - /// object changes, this string will change. The current value is - /// also returned on the API Response object (see Section 3.4), - /// allowing clients to quickly determine if the session information - /// has changed (e.g., an account has been added or removed), so they - /// need to refetch the object. - #[serde(borrow)] - state: SessionState<'a>, + pub account_capabilities: AccountCapabilities, } #[derive(Serialize, Deserialize, Clone, Debug)] diff --git a/jogre-server/src/layers/auth_required.rs b/jogre-server/src/layers/auth_required.rs index 75c68b0..5285ba4 100644 --- a/jogre-server/src/layers/auth_required.rs +++ a/jogre-server/src/layers/auth_required.rs @@ -35,5 +35,7 @@ debug!(?grant, "Request authorized"); + request.extensions_mut().insert(grant); + next.run(request).await } diff --git a/jogre-server/src/methods/session.rs b/jogre-server/src/methods/session.rs index 12c1120..53b7581 100644 --- a/jogre-server/src/methods/session.rs +++ a/jogre-server/src/methods/session.rs @@ -1,3 +1,119 @@ -pub async fn get() -> &'static str { - "hello world" +use std::sync::{Arc, OnceLock}; + +use axum::{extract::State, Extension, Json}; +use jmap_proto::{ + common::{Id, SessionState}, + endpoints::session::{ + Account, AccountCapabilities, CoreCapability, ServerCapabilities, Session, + }, +}; +use oxide_auth::primitives::grant::Grant; + +use crate::{ + context::Context, + store::{AccountProvider, UserProvider}, +}; + +static API_URL: OnceLock> = OnceLock::new(); +static DOWNLOAD_URL: OnceLock> = OnceLock::new(); +static UPLOAD_URL: OnceLock> = OnceLock::new(); +static EVENT_SOURCE_URL: OnceLock> = OnceLock::new(); + +pub async fn get( + State(context): State>, + Extension(grant): Extension, +) -> Json> { + let username = grant.owner_id; + + let user = context + .store + .get_by_username(&username) + .await + .unwrap() + .unwrap(); + + let (accounts, user_seq_number) = tokio::join!( + async { + context + .store + .get_accounts_for_user(user.id) + .await + .unwrap() + .into_iter() + .map(|acc| { + ( + Id(acc.id.to_string().into()), + Account { + name: acc.name.into(), + is_personal: acc.is_personal, + is_read_only: acc.is_read_only, + account_capabilities: AccountCapabilities {}, + }, + ) + }) + .collect() + }, + async { + context + .store + .fetch_seq_number_for_user(user.id) + .await + .unwrap() + } + ); + + Json(Session { + capabilities: ServerCapabilities { + core: CoreCapability { + max_size_upload: context.core_capabilities.max_size_upload.into(), + max_concurrent_upload: context.core_capabilities.max_concurrent_upload.into(), + max_size_request: context.core_capabilities.max_size_request.into(), + max_concurrent_requests: context.core_capabilities.max_concurrent_requests.into(), + max_calls_in_request: context.core_capabilities.max_calls_in_request.into(), + max_objects_in_get: context.core_capabilities.max_objects_in_get.into(), + max_objects_in_set: context.core_capabilities.max_objects_in_set.into(), + collation_algorithms: Default::default(), + }, + }, + accounts, + primary_accounts: Default::default(), + username: username.into(), + api_url: API_URL + .get_or_init(|| { + context + .base_url + .join("api/") + .unwrap() + .to_string() + .into_boxed_str() + }) + .as_ref() + .into(), + download_url: DOWNLOAD_URL + .get_or_init(|| { + let base = context.base_url.join("download/").unwrap(); + format!("{base}{{accountId}}/{{blobId}}/{{name}}?accept={{type}}").into_boxed_str() + }) + .as_ref() + .into(), + upload_url: UPLOAD_URL + .get_or_init(|| { + let base = context.base_url.join("upload/").unwrap(); + format!("{base}{{accountId}}/").into_boxed_str() + }) + .as_ref() + .into(), + event_source_url: EVENT_SOURCE_URL + .get_or_init(|| { + context + .base_url + .join("eventsource/?types={types}&closeafter={closeafter}&ping={ping}") + .unwrap() + .to_string() + .into_boxed_str() + }) + .as_ref() + .into(), + state: SessionState(user_seq_number.to_string().into()), + }) } diff --git a/jogre-server/src/store/rocksdb.rs b/jogre-server/src/store/rocksdb.rs index 76fe878..e8813df 100644 --- a/jogre-server/src/store/rocksdb.rs +++ a/jogre-server/src/store/rocksdb.rs @@ -1,17 +1,22 @@ use std::path::PathBuf; use axum::async_trait; -use rocksdb::{IteratorMode, Options, DB}; +use rocksdb::{IteratorMode, MergeOperands, Options, DB}; use serde::Deserialize; +use uuid::Uuid; -use crate::store::{User, UserProvider}; +use crate::store::{Account, AccountAccessLevel, AccountProvider, User, UserProvider}; #[derive(Debug)] pub enum Error {} const USER_BY_USERNAME_CF: &str = "users_by_username"; const USER_BY_UUID_CF: &str = "users_by_uuid"; +const USER_SEQ_NUMBER: &str = "users_seq_number"; +const ACCOUNTS_BY_UUID: &str = "accounts_by_uuid"; +const ACCOUNTS_ACCESS_BY_USER: &str = "accounts_access_by_user"; + const BINCODE_CONFIG: bincode::config::Configuration = bincode::config::standard(); #[derive(Deserialize)] @@ -29,22 +34,168 @@ pub fn new(config: Config) -> Self { let mut db_options = Options::default(); db_options.create_if_missing(true); + db_options.set_merge_operator_associative("test operator", rocksdb_merger); db_options.create_missing_column_families(true); - let db = DB::open_cf( + let db = DB::open_cf_with_opts( &db_options, config.path, - [USER_BY_USERNAME_CF, USER_BY_UUID_CF], + [ + (USER_BY_USERNAME_CF, db_options.clone()), + (USER_BY_UUID_CF, db_options.clone()), + (ACCOUNTS_BY_UUID, db_options.clone()), + (ACCOUNTS_ACCESS_BY_USER, db_options.clone()), + (USER_SEQ_NUMBER, db_options.clone()), + ], ) .unwrap(); Self { db } + } +} + +fn rocksdb_merger( + _new_key: &[u8], + existing_val: Option<&[u8]>, + operands: &MergeOperands, +) -> Option> { + let mut new_val = existing_val.map(|v| v.to_vec()).unwrap_or_default(); + + for operand in operands { + let (operation, operand) = MergeOperation::parse(operand); + + match operation { + Some(MergeOperation::Increment) => { + if new_val.is_empty() { + new_val.extend_from_slice(&0_u64.to_be_bytes()); + } + + let mut carry = true; + + for byte in new_val.iter_mut().rev() { + if carry { + *byte = byte.wrapping_add(1); + carry = *byte == 0; + } else { + break; + } + } + + if carry { + new_val.fill(0); + } + } + None => { + panic!("unknown operand: {operand:?}"); + } + } + } + + Some(new_val) +} + +enum MergeOperation { + Increment, +} + +impl MergeOperation { + pub fn parse(v: &[u8]) -> (Option, &[u8]) { + if v == b"INCR" { + (Some(Self::Increment), &[]) + } else { + (None, v) + } + } +} + +#[async_trait] +impl AccountProvider for RocksDb { + type Error = Error; + + async fn create_account(&self, account: Account) -> Result<(), Self::Error> { + let bytes = bincode::serde::encode_to_vec(&account, BINCODE_CONFIG).unwrap(); + + let by_uuid_handle = self.db.cf_handle(ACCOUNTS_BY_UUID).unwrap(); + self.db + .put_cf(by_uuid_handle, account.id.as_bytes(), bytes) + .unwrap(); + + Ok(()) + } + + async fn attach_account_to_user( + &self, + account: Uuid, + user: Uuid, + access: AccountAccessLevel, + ) -> Result<(), Self::Error> { + { + let access_handle = self.db.cf_handle(ACCOUNTS_ACCESS_BY_USER).unwrap(); + + let mut compound_key = [0_u8; 32]; + compound_key[..16].copy_from_slice(user.as_bytes()); + compound_key[16..].copy_from_slice(account.as_bytes()); + + self.db + .put_cf(access_handle, compound_key, (access as u8).to_be_bytes()) + .unwrap(); + } + + self.increment_seq_number_for_user(user).await.unwrap(); + + Ok(()) + } + + async fn get_accounts_for_user(&self, user_id: Uuid) -> Result, Self::Error> { + let access_handle = self.db.cf_handle(ACCOUNTS_ACCESS_BY_USER).unwrap(); + let account_handle = self.db.cf_handle(ACCOUNTS_BY_UUID).unwrap(); + + Ok(self + .db + .prefix_iterator_cf(access_handle, user_id.as_bytes()) + .map(|v| v.unwrap()) + .filter_map(|(key, _access_level)| { + let Some(account) = key.strip_prefix(user_id.as_bytes()) else { + panic!("got invalid key from rocksdb"); + }; + + let Some(account_bytes) = self.db.get_cf(account_handle, account).unwrap() else { + return None; + }; + + let (res, _): (Account, _) = + bincode::serde::decode_from_slice(&account_bytes, BINCODE_CONFIG).unwrap(); + + Some(res) + }) + .collect()) } } #[async_trait] impl UserProvider for RocksDb { type Error = Error; + + async fn increment_seq_number_for_user(&self, user: Uuid) -> Result<(), Self::Error> { + let seq_handle = self.db.cf_handle(USER_SEQ_NUMBER).unwrap(); + self.db + .merge_cf(seq_handle, user.as_bytes(), "INCR") + .unwrap(); + Ok(()) + } + + async fn fetch_seq_number_for_user(&self, user: Uuid) -> Result { + let seq_handle = self.db.cf_handle(USER_SEQ_NUMBER).unwrap(); + + let Some(bytes) = self.db.get_pinned_cf(seq_handle, user.as_bytes()).unwrap() else { + return Ok(0); + }; + + let mut val = [0_u8; std::mem::size_of::()]; + val.copy_from_slice(&bytes); + + Ok(u64::from_be_bytes(val)) + } async fn has_any_users(&self) -> Result { let by_uuid_handle = self.db.cf_handle(USER_BY_UUID_CF).unwrap(); -- rgit 0.1.3