From 69b2275a0aa9eaff5134777f8a228c2132df02e1 Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Tue, 7 May 2024 14:30:23 -0500 Subject: refactor(wip): experiment with rocksdb --- src/api.rs | 8 ++++-- src/api/account.rs | 16 ++++++++--- src/api/users.rs | 83 +++++++++++++++++++++++++++++++----------------------- src/auth.rs | 8 ++++-- src/main.rs | 2 +- src/state.rs | 40 ++++++++++++++++++++++++-- 6 files changed, 110 insertions(+), 47 deletions(-) (limited to 'src') diff --git a/src/api.rs b/src/api.rs index a35fba5..0d3f446 100644 --- a/src/api.rs +++ b/src/api.rs @@ -40,7 +40,9 @@ mod tests { async fn test_healthcheck_ok(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let request = Request::builder().uri("/healthcheck").body(Body::empty())?; @@ -55,7 +57,9 @@ mod tests { async fn test_fallback_not_found(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let request = Request::builder() .uri("/does-not-exist") diff --git a/src/api/account.rs b/src/api/account.rs index bae7c54..4fb2675 100644 --- a/src/api/account.rs +++ b/src/api/account.rs @@ -97,7 +97,9 @@ mod tests { async fn test_login_ok(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let auth = Authorization::basic(USER_EMAIL, USER_PASSWORD); @@ -141,7 +143,9 @@ mod tests { async fn test_login_unauthorized(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let auth = Authorization::basic(USER_EMAIL, "hunter2"); @@ -162,7 +166,9 @@ mod tests { async fn test_login_not_found(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let auth = Authorization::basic(USER_EMAIL, USER_PASSWORD); @@ -183,7 +189,9 @@ mod tests { async fn test_logout_ok(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let request = Request::builder() .uri("/logout") diff --git a/src/api/users.rs b/src/api/users.rs index e07bf7e..24bcf97 100644 --- a/src/api/users.rs +++ b/src/api/users.rs @@ -13,7 +13,7 @@ use uuid::Uuid; use crate::{ auth::{credentials::Credential, AccessClaims}, - state::AppState, + state::{AppState, KVStore}, }; use super::error::Error; @@ -40,54 +40,53 @@ pub struct Registration { } pub async fn create( - State(pool): State, + State(state): State, Json(Registration { name, email, password, }): Json, -) -> impl IntoResponse { +) -> Result { email_address::EmailAddress::from_str(&email)?; - let exists: Option = sqlx::query_scalar!( - "SELECT EXISTS(SELECT 1 FROM user_ WHERE email = $1 LIMIT 1)", - email.to_ascii_lowercase() - ) - .fetch_one(&pool) - .await?; - - if exists.is_some_and(|b| b) { - return Err(Error::EmailExists); - } - // TODO: Move this into a micro service, possibly behind a feature flag. let (status, (access, refresh)) = - crate::auth::credentials::create(State(pool.clone()), Json(Credential { password })) + crate::auth::credentials::create(State(state.pool.clone()), Json(Credential { password })) .await?; - let user = sqlx::query_as!( - User, - "INSERT INTO user_ (id,name,email) VALUES ($1, $2, $3) RETURNING *", - refresh.sub, + let user = User { + id: refresh.sub, name, - email.to_ascii_lowercase(), - ) - .fetch_one(&pool) - .await?; + email, + created_at: OffsetDateTime::now_utc(), + updated_at: OffsetDateTime::now_utc(), + }; + + let tx = state.kv_store.transaction(); + tx.put(refresh.sub, serde_json::to_vec(&user).unwrap()) + .unwrap(); Ok((status, access, refresh, Json(user))) } pub async fn show( Path(uuid): Path, - State(pool): State, + State(state): State, _: AccessClaims, -) -> Result { - sqlx::query_as!(User, "SELECT * FROM user_ WHERE id = $1 LIMIT 1", uuid) - .fetch_optional(&pool) - .await? +) -> Result, Error> { + let tx = state.kv_store.transaction(); + Ok(tx + .get(uuid) + .unwrap() .ok_or_else(|| Error::UserNotFound) + .map(|s| serde_json::from_slice(&s))? .map(Json) + .unwrap()) + //sqlx::query_as!(User, "SELECT * FROM user_ WHERE id = $1 LIMIT 1", uuid) + // .fetch_optional(&pool) + // .await? + // .ok_or_else(|| Error::UserNotFound) + // .map(Json) } #[cfg(test)] @@ -120,7 +119,9 @@ mod tests { async fn test_get_ok_self(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let request = Request::builder() .uri(format!("/users/{}", USER_ID)) @@ -150,7 +151,9 @@ mod tests { async fn test_get_ok_other(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let request = Request::builder() .uri(format!("/users/{}", USER_ID)) @@ -182,7 +185,9 @@ mod tests { async fn test_get_not_found(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let request = Request::builder() .uri(format!("/users/{}", USER_ID)) @@ -203,7 +208,9 @@ mod tests { async fn test_get_unauthorized_invalid_token_format(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let request = Request::builder() .uri(format!("/users/{}", USER_ID)) @@ -221,7 +228,9 @@ mod tests { async fn test_get_unauthorized_missing_token(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let request = Request::builder() .uri(format!("/users/{}", USER_ID)) @@ -238,7 +247,9 @@ mod tests { async fn test_post_created(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let user = serde_json::json!( { "name": USER_NAME, @@ -269,7 +280,9 @@ mod tests { async fn test_post_conflict(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let user = serde_json::json!( { "name": USER_NAME, diff --git a/src/auth.rs b/src/auth.rs index 909534e..3c40661 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -94,7 +94,9 @@ mod tests { async fn test_issue_ok(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let auth = Authorization::basic( "4c14f795-86f0-4361-a02f-0edb966fb145", @@ -118,7 +120,9 @@ mod tests { async fn test_issue_unauthorized(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let auth = Authorization::basic("4c14f795-86f0-4361-a02f-0edb966fb145", "hunter2"); diff --git a/src/main.rs b/src/main.rs index 71e68f9..31fa7da 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,7 +21,7 @@ async fn main() -> Result<(), main_error::MainError> { let config = Config::builder().file()?.env().build()?; let listener = TcpListener::bind(config.listen_addr).await?; - let state = AppState::new(config.database_url).await?; + let state = AppState::new(config.database_url, "./rocks.db").await?; let router = unnamed_server::router().with_state(state); tracing::info!("Listening on http://{}", listener.local_addr()?); diff --git a/src/state.rs b/src/state.rs index 771647d..f7fe10e 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,17 +1,40 @@ -use std::fmt::Debug; +use std::{fmt::Debug, path::Path, sync::Arc}; use axum::extract::FromRef; use sqlx::PgPool; use crate::Error; +pub type KVStore = rocksdb::OptimisticTransactionDB; + #[derive(Debug, Clone, FromRef)] pub struct AppState { pub pool: PgPool, + pub kv_store: Arc>, } impl AppState { - pub async fn new(uri: String) -> Result { + pub async fn new

(uri: String, path: P) -> Result + where + P: AsRef, + { + Ok(Self { + pool: Self::init_pool(uri).await?, + kv_store: Arc::new(Self::init_kv_store(path).unwrap()), + }) + } + + pub fn with_pool

(pool: PgPool, path: P) -> Self + where + P: AsRef, + { + Self { + pool, + kv_store: Arc::new(Self::init_kv_store(path).unwrap()), + } + } + + pub async fn init_pool(uri: String) -> Result { tracing::debug!("Attempting to connect to database..."); let pool = sqlx::postgres::PgPoolOptions::new() @@ -23,6 +46,17 @@ impl AppState { sqlx::migrate!().run(&pool).await?; - Ok(Self { pool }) + Ok(pool) + } + + pub fn init_kv_store

( + path: P, + ) -> Result, rocksdb::Error> + where + P: AsRef, + { + let mut opts = rocksdb::Options::default(); + opts.create_if_missing(true); + rocksdb::OptimisticTransactionDB::open(&opts, path) } } -- cgit v1.2.3-70-g09d2