Implement session endpoint
Diff
Cargo.lock | 1 +
jogre-server/Cargo.toml | 4 +++-
jogre-server/config.toml | 1 +
jmap-proto/src/common.rs | 10 ++++++++--
jogre-server/src/config.rs | 88 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
jogre-server/src/context.rs | 9 ++++++++-
jogre-server/src/main.rs | 20 +++++++++++++++++---
jogre-server/src/store.rs | 105 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
jmap-proto/src/endpoints/session.rs | 123 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------
jogre-server/src/layers/auth_required.rs | 2 ++
jogre-server/src/methods/session.rs | 120 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
jogre-server/src/store/rocksdb.rs | 159 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
12 files changed, 567 insertions(+), 75 deletions(-)
@@ -922,6 +922,7 @@
"futures",
"hex",
"hmac",
"jmap-proto",
"oxide-auth",
"oxide-auth-axum",
"rand",
@@ -6,6 +6,8 @@
[dependencies]
jmap-proto = { path = "../jmap-proto" }
argon2 = "0.5"
askama = "0.12"
axum = "0.6"
@@ -25,7 +27,7 @@
toml = "0.8"
tracing = "0.1"
tracing-subscriber = "0.3"
url = "2.4"
url = { version = "2.4", features = ["serde"] }
uuid = { version = "1.4", features = ["v4", "serde"] }
serde = { version = "1.0.188", features = ["derive"] }
sha3 = "0.10"
@@ -1,4 +1,5 @@
private-key = "mycoolatleast32byteprivatekey"
base-url = "http://127.0.0.1:8888"
[store]
type = "rocksdb"
@@ -14,6 +14,12 @@
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)]
pub struct UnsignedInt(u64);
impl From<u64> for UnsignedInt {
fn from(value: u64) -> Self {
Self(value)
}
}
@@ -40,7 +46,7 @@
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)]
pub struct Id<'a>(#[serde(borrow)] Cow<'a, str>);
pub struct Id<'a>(#[serde(borrow)] pub Cow<'a, str>);
@@ -64,4 +70,4 @@
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct SessionState<'a>(#[serde(borrow)] Cow<'a, str>);
pub struct SessionState<'a>(#[serde(borrow)] pub Cow<'a, str>);
@@ -17,4 +17,92 @@
pub store: StoreConfig,
#[serde(default)]
pub core_capabilities: CoreCapabilities,
pub base_url: url::Url,
}
#[derive(Deserialize, Copy, Clone, Debug)]
#[serde(rename_all = "kebab-case")]
pub struct CoreCapabilities {
#[serde(default = "CoreCapabilities::default_max_size_upload")]
pub max_size_upload: u64,
#[serde(default = "CoreCapabilities::default_max_concurrent_upload")]
pub max_concurrent_upload: u64,
#[serde(default = "CoreCapabilities::default_max_size_request")]
pub max_size_request: u64,
#[serde(default = "CoreCapabilities::default_max_concurrent_requests")]
pub max_concurrent_requests: u64,
#[serde(default = "CoreCapabilities::default_max_calls_in_request")]
pub max_calls_in_request: u64,
#[serde(default = "CoreCapabilities::default_max_objects_in_get")]
pub max_objects_in_get: u64,
#[serde(default = "CoreCapabilities::default_max_objects_in_set")]
pub max_objects_in_set: u64,
}
impl Default for CoreCapabilities {
fn default() -> Self {
Self {
max_size_upload: Self::default_max_size_upload(),
max_concurrent_upload: Self::default_max_concurrent_upload(),
max_size_request: Self::default_max_size_request(),
max_concurrent_requests: Self::default_max_concurrent_requests(),
max_calls_in_request: Self::default_max_calls_in_request(),
max_objects_in_get: Self::default_max_objects_in_get(),
max_objects_in_set: Self::default_max_objects_in_set(),
}
}
}
impl CoreCapabilities {
const fn default_max_size_upload() -> u64 {
50_000_000
}
const fn default_max_concurrent_upload() -> u64 {
4
}
const fn default_max_size_request() -> u64 {
10_000_000
}
const fn default_max_concurrent_requests() -> u64 {
4
}
const fn default_max_calls_in_request() -> u64 {
16
}
const fn default_max_objects_in_get() -> u64 {
500
}
const fn default_max_objects_in_set() -> u64 {
500
}
}
@@ -1,12 +1,17 @@
use std::sync::Arc;
use crate::{config::Config, store::Store};
use crate::{
config::{Config, CoreCapabilities},
store::Store,
};
pub mod oauth2;
pub struct Context {
pub oauth2: oauth2::OAuth2,
pub store: Arc<Store>,
pub base_url: url::Url,
pub core_capabilities: CoreCapabilities,
}
impl Context {
@@ -17,6 +22,8 @@
Self {
oauth2: oauth2::OAuth2::new(store.clone(), derived_keys),
store,
base_url: config.base_url,
core_capabilities: config.core_capabilities,
}
}
}
@@ -11,7 +11,10 @@
use rand::RngCore;
use tracing::info;
use crate::{context::Context, store::UserProvider};
use crate::{
context::Context,
store::{AccountAccessLevel, AccountProvider, UserProvider},
};
#[derive(Parser, Debug)]
#[clap(author, version, about)]
@@ -54,6 +57,17 @@
info!("User root created with password {password}");
let root = store::User::new("root".into(), &password);
context.store.create_user(root).await.unwrap();
let root_user = store::User::new("root".into(), &password);
let root_user_id = root_user.id;
context.store.create_user(root_user).await.unwrap();
let root_account = store::Account::new("root".into(), true, false);
let root_account_id = root_account.id;
context.store.create_account(root_account).await.unwrap();
context
.store
.attach_account_to_user(root_account_id, root_user_id, AccountAccessLevel::Owner)
.await
.unwrap();
}
@@ -6,14 +6,20 @@
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Serialize, Deserialize)]
pub struct User {
id: Uuid,
pub id: Uuid,
pub username: String,
password: String,
}
impl User {
pub fn new(username: String, password: &str) -> Self {
let password = Argon2::default()
.hash_password(password.as_bytes(), &SaltString::generate(&mut OsRng))
@@ -27,6 +33,7 @@
}
}
pub fn verify_password(&self, password: &str) -> bool {
let parsed_hash = PasswordHash::new(&self.password).unwrap();
Argon2::default()
@@ -39,13 +46,65 @@
pub trait UserProvider {
type Error;
async fn increment_seq_number_for_user(&self, user: Uuid) -> Result<(), Self::Error>;
async fn fetch_seq_number_for_user(&self, user: Uuid) -> Result<u64, Self::Error>;
async fn has_any_users(&self) -> Result<bool, Self::Error>;
async fn create_user(&self, user: User) -> Result<(), Self::Error>;
async fn get_by_username(&self, username: &str) -> Result<Option<User>, Self::Error>;
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Account {
pub id: Uuid,
pub name: String,
pub is_personal: bool,
pub is_read_only: bool,
}
impl Account {
pub fn new(name: String, is_personal: bool, is_read_only: bool) -> Self {
Self {
id: Uuid::new_v4(),
name,
is_personal,
is_read_only,
}
}
}
#[async_trait]
pub trait AccountProvider {
type Error;
async fn create_account(&self, account: Account) -> Result<(), Self::Error>;
async fn attach_account_to_user(
&self,
account: Uuid,
user: Uuid,
access: AccountAccessLevel,
) -> Result<(), Self::Error>;
async fn get_accounts_for_user(&self, user_id: Uuid) -> Result<Vec<Account>, Self::Error>;
}
#[repr(u8)]
pub enum AccountAccessLevel {
Owner,
}
#[derive(Deserialize)]
#[serde(tag = "type")]
pub enum StoreConfig {
@@ -61,6 +120,34 @@
pub fn from_config(config: StoreConfig) -> Self {
match config {
StoreConfig::RocksDb(config) => Self::RocksDb(rocksdb::RocksDb::new(config)),
}
}
}
#[async_trait]
impl AccountProvider for Store {
type Error = rocksdb::Error;
async fn create_account(&self, account: Account) -> Result<(), Self::Error> {
match self {
Store::RocksDb(db) => db.create_account(account).await,
}
}
async fn attach_account_to_user(
&self,
account: Uuid,
user: Uuid,
access: AccountAccessLevel,
) -> Result<(), Self::Error> {
match self {
Store::RocksDb(db) => db.attach_account_to_user(account, user, access).await,
}
}
async fn get_accounts_for_user(&self, user_id: Uuid) -> Result<Vec<Account>, Self::Error> {
match self {
Store::RocksDb(db) => db.get_accounts_for_user(user_id).await,
}
}
}
@@ -68,19 +155,35 @@
#[async_trait]
impl UserProvider for Store {
type Error = rocksdb::Error;
async fn increment_seq_number_for_user(&self, user: Uuid) -> Result<(), Self::Error> {
match self {
Store::RocksDb(db) => db.increment_seq_number_for_user(user).await,
}
}
async fn fetch_seq_number_for_user(&self, user: Uuid) -> Result<u64, Self::Error> {
match self {
Store::RocksDb(db) => db.fetch_seq_number_for_user(user).await,
}
}
async fn has_any_users(&self) -> Result<bool, Self::Error> {
match self {
Store::RocksDb(db) => db.has_any_users().await,
}
}
async fn create_user(&self, user: User) -> Result<(), Self::Error> {
match self {
Store::RocksDb(db) => db.create_user(user).await,
}
}
async fn get_by_username(&self, username: &str) -> Result<Option<User>, Self::Error> {
match self {
Store::RocksDb(db) => db.get_by_username(username).await,
@@ -16,6 +16,7 @@
#[serde_as]
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Session<'a> {
@@ -24,11 +25,57 @@
#[serde(borrow)]
capabilities: ServerCapabilities<'a>,
pub capabilities: ServerCapabilities<'a>,
#[serde(borrow)]
pub accounts: HashMap<Id<'a>, Account<'a>>,
#[serde_as(as = "HashMap<BorrowCow, _>")]
pub primary_accounts: HashMap<Cow<'a, str>, Id<'a>>,
#[serde(borrow)]
pub username: Cow<'a, str>,
#[serde(borrow)]
pub api_url: Cow<'a, str>,
#[serde(borrow)]
pub download_url: Cow<'a, str>,
#[serde(borrow)]
pub upload_url: Cow<'a, str>,
#[serde(borrow)]
pub event_source_url: Cow<'a, str>,
#[serde(borrow)]
accounts: HashMap<Id<'a>, Account<'a>>,
pub state: SessionState<'a>,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
@@ -36,7 +83,7 @@
#[serde(rename = "urn:ietf:params:jmap:core", borrow)]
core: CoreCapability<'a>,
pub core: CoreCapability<'a>,
}
#[serde_as]
@@ -46,34 +93,34 @@
max_size_upload: UnsignedInt,
pub max_size_upload: UnsignedInt,
max_concurrent_upload: UnsignedInt,
pub max_concurrent_upload: UnsignedInt,
max_size_request: UnsignedInt,
pub max_size_request: UnsignedInt,
max_concurrent_requests: UnsignedInt,
pub max_concurrent_requests: UnsignedInt,
max_calls_in_request: UnsignedInt,
pub max_calls_in_request: UnsignedInt,
max_objects_in_get: UnsignedInt,
pub max_objects_in_get: UnsignedInt,
max_objects_in_set: UnsignedInt,
pub max_objects_in_set: UnsignedInt,
#[serde_as(as = "BTreeSet<BorrowCow>")]
collation_algorithms: BTreeSet<Cow<'a, str>>,
pub collation_algorithms: BTreeSet<Cow<'a, str>>,
}
#[serde_as]
@@ -84,66 +131,20 @@
#[serde(borrow)]
name: Cow<'a, str>,
pub name: Cow<'a, str>,
is_personal: bool,
pub is_personal: bool,
is_read_only: bool,
pub is_read_only: bool,
account_capabilities: AccountCapabilities,
#[serde_as(as = "HashMap<BorrowCow, _>")]
primary_accounts: HashMap<Cow<'a, str>, Id<'a>>,
#[serde(borrow)]
username: Cow<'a, str>,
#[serde(borrow)]
api_url: Cow<'a, str>,
#[serde(borrow)]
download_url: Cow<'a, str>,
#[serde(borrow)]
upload_url: Cow<'a, str>,
#[serde(borrow)]
event_source_url: Cow<'a, str>,
#[serde(borrow)]
state: SessionState<'a>,
pub account_capabilities: AccountCapabilities,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
@@ -35,5 +35,7 @@
debug!(?grant, "Request authorized");
request.extensions_mut().insert(grant);
next.run(request).await
}
@@ -1,3 +1,119 @@
pub async fn get() -> &'static str {
"hello world"
use std::sync::{Arc, OnceLock};
use axum::{extract::State, Extension, Json};
use jmap_proto::{
common::{Id, SessionState},
endpoints::session::{
Account, AccountCapabilities, CoreCapability, ServerCapabilities, Session,
},
};
use oxide_auth::primitives::grant::Grant;
use crate::{
context::Context,
store::{AccountProvider, UserProvider},
};
static API_URL: OnceLock<Box<str>> = OnceLock::new();
static DOWNLOAD_URL: OnceLock<Box<str>> = OnceLock::new();
static UPLOAD_URL: OnceLock<Box<str>> = OnceLock::new();
static EVENT_SOURCE_URL: OnceLock<Box<str>> = OnceLock::new();
pub async fn get(
State(context): State<Arc<Context>>,
Extension(grant): Extension<Grant>,
) -> Json<Session<'static>> {
let username = grant.owner_id;
let user = context
.store
.get_by_username(&username)
.await
.unwrap()
.unwrap();
let (accounts, user_seq_number) = tokio::join!(
async {
context
.store
.get_accounts_for_user(user.id)
.await
.unwrap()
.into_iter()
.map(|acc| {
(
Id(acc.id.to_string().into()),
Account {
name: acc.name.into(),
is_personal: acc.is_personal,
is_read_only: acc.is_read_only,
account_capabilities: AccountCapabilities {},
},
)
})
.collect()
},
async {
context
.store
.fetch_seq_number_for_user(user.id)
.await
.unwrap()
}
);
Json(Session {
capabilities: ServerCapabilities {
core: CoreCapability {
max_size_upload: context.core_capabilities.max_size_upload.into(),
max_concurrent_upload: context.core_capabilities.max_concurrent_upload.into(),
max_size_request: context.core_capabilities.max_size_request.into(),
max_concurrent_requests: context.core_capabilities.max_concurrent_requests.into(),
max_calls_in_request: context.core_capabilities.max_calls_in_request.into(),
max_objects_in_get: context.core_capabilities.max_objects_in_get.into(),
max_objects_in_set: context.core_capabilities.max_objects_in_set.into(),
collation_algorithms: Default::default(),
},
},
accounts,
primary_accounts: Default::default(),
username: username.into(),
api_url: API_URL
.get_or_init(|| {
context
.base_url
.join("api/")
.unwrap()
.to_string()
.into_boxed_str()
})
.as_ref()
.into(),
download_url: DOWNLOAD_URL
.get_or_init(|| {
let base = context.base_url.join("download/").unwrap();
format!("{base}{{accountId}}/{{blobId}}/{{name}}?accept={{type}}").into_boxed_str()
})
.as_ref()
.into(),
upload_url: UPLOAD_URL
.get_or_init(|| {
let base = context.base_url.join("upload/").unwrap();
format!("{base}{{accountId}}/").into_boxed_str()
})
.as_ref()
.into(),
event_source_url: EVENT_SOURCE_URL
.get_or_init(|| {
context
.base_url
.join("eventsource/?types={types}&closeafter={closeafter}&ping={ping}")
.unwrap()
.to_string()
.into_boxed_str()
})
.as_ref()
.into(),
state: SessionState(user_seq_number.to_string().into()),
})
}
@@ -1,17 +1,22 @@
use std::path::PathBuf;
use axum::async_trait;
use rocksdb::{IteratorMode, Options, DB};
use rocksdb::{IteratorMode, MergeOperands, Options, DB};
use serde::Deserialize;
use uuid::Uuid;
use crate::store::{User, UserProvider};
use crate::store::{Account, AccountAccessLevel, AccountProvider, User, UserProvider};
#[derive(Debug)]
pub enum Error {}
const USER_BY_USERNAME_CF: &str = "users_by_username";
const USER_BY_UUID_CF: &str = "users_by_uuid";
const USER_SEQ_NUMBER: &str = "users_seq_number";
const ACCOUNTS_BY_UUID: &str = "accounts_by_uuid";
const ACCOUNTS_ACCESS_BY_USER: &str = "accounts_access_by_user";
const BINCODE_CONFIG: bincode::config::Configuration = bincode::config::standard();
#[derive(Deserialize)]
@@ -29,22 +34,168 @@
pub fn new(config: Config) -> Self {
let mut db_options = Options::default();
db_options.create_if_missing(true);
db_options.set_merge_operator_associative("test operator", rocksdb_merger);
db_options.create_missing_column_families(true);
let db = DB::open_cf(
let db = DB::open_cf_with_opts(
&db_options,
config.path,
[USER_BY_USERNAME_CF, USER_BY_UUID_CF],
[
(USER_BY_USERNAME_CF, db_options.clone()),
(USER_BY_UUID_CF, db_options.clone()),
(ACCOUNTS_BY_UUID, db_options.clone()),
(ACCOUNTS_ACCESS_BY_USER, db_options.clone()),
(USER_SEQ_NUMBER, db_options.clone()),
],
)
.unwrap();
Self { db }
}
}
fn rocksdb_merger(
_new_key: &[u8],
existing_val: Option<&[u8]>,
operands: &MergeOperands,
) -> Option<Vec<u8>> {
let mut new_val = existing_val.map(|v| v.to_vec()).unwrap_or_default();
for operand in operands {
let (operation, operand) = MergeOperation::parse(operand);
match operation {
Some(MergeOperation::Increment) => {
if new_val.is_empty() {
new_val.extend_from_slice(&0_u64.to_be_bytes());
}
let mut carry = true;
for byte in new_val.iter_mut().rev() {
if carry {
*byte = byte.wrapping_add(1);
carry = *byte == 0;
} else {
break;
}
}
if carry {
new_val.fill(0);
}
}
None => {
panic!("unknown operand: {operand:?}");
}
}
}
Some(new_val)
}
enum MergeOperation {
Increment,
}
impl MergeOperation {
pub fn parse(v: &[u8]) -> (Option<MergeOperation>, &[u8]) {
if v == b"INCR" {
(Some(Self::Increment), &[])
} else {
(None, v)
}
}
}
#[async_trait]
impl AccountProvider for RocksDb {
type Error = Error;
async fn create_account(&self, account: Account) -> Result<(), Self::Error> {
let bytes = bincode::serde::encode_to_vec(&account, BINCODE_CONFIG).unwrap();
let by_uuid_handle = self.db.cf_handle(ACCOUNTS_BY_UUID).unwrap();
self.db
.put_cf(by_uuid_handle, account.id.as_bytes(), bytes)
.unwrap();
Ok(())
}
async fn attach_account_to_user(
&self,
account: Uuid,
user: Uuid,
access: AccountAccessLevel,
) -> Result<(), Self::Error> {
{
let access_handle = self.db.cf_handle(ACCOUNTS_ACCESS_BY_USER).unwrap();
let mut compound_key = [0_u8; 32];
compound_key[..16].copy_from_slice(user.as_bytes());
compound_key[16..].copy_from_slice(account.as_bytes());
self.db
.put_cf(access_handle, compound_key, (access as u8).to_be_bytes())
.unwrap();
}
self.increment_seq_number_for_user(user).await.unwrap();
Ok(())
}
async fn get_accounts_for_user(&self, user_id: Uuid) -> Result<Vec<Account>, Self::Error> {
let access_handle = self.db.cf_handle(ACCOUNTS_ACCESS_BY_USER).unwrap();
let account_handle = self.db.cf_handle(ACCOUNTS_BY_UUID).unwrap();
Ok(self
.db
.prefix_iterator_cf(access_handle, user_id.as_bytes())
.map(|v| v.unwrap())
.filter_map(|(key, _access_level)| {
let Some(account) = key.strip_prefix(user_id.as_bytes()) else {
panic!("got invalid key from rocksdb");
};
let Some(account_bytes) = self.db.get_cf(account_handle, account).unwrap() else {
return None;
};
let (res, _): (Account, _) =
bincode::serde::decode_from_slice(&account_bytes, BINCODE_CONFIG).unwrap();
Some(res)
})
.collect())
}
}
#[async_trait]
impl UserProvider for RocksDb {
type Error = Error;
async fn increment_seq_number_for_user(&self, user: Uuid) -> Result<(), Self::Error> {
let seq_handle = self.db.cf_handle(USER_SEQ_NUMBER).unwrap();
self.db
.merge_cf(seq_handle, user.as_bytes(), "INCR")
.unwrap();
Ok(())
}
async fn fetch_seq_number_for_user(&self, user: Uuid) -> Result<u64, Self::Error> {
let seq_handle = self.db.cf_handle(USER_SEQ_NUMBER).unwrap();
let Some(bytes) = self.db.get_pinned_cf(seq_handle, user.as_bytes()).unwrap() else {
return Ok(0);
};
let mut val = [0_u8; std::mem::size_of::<u64>()];
val.copy_from_slice(&bytes);
Ok(u64::from_be_bytes(val))
}
async fn has_any_users(&self) -> Result<bool, Self::Error> {
let by_uuid_handle = self.db.cf_handle(USER_BY_UUID_CF).unwrap();