From 69b2275a0aa9eaff5134777f8a228c2132df02e1 Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Tue, 7 May 2024 14:30:23 -0500 Subject: refactor(wip): experiment with rocksdb --- Cargo.lock | 157 +++++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + src/api.rs | 8 ++- src/api/account.rs | 16 ++++-- src/api/users.rs | 83 ++++++++++++++++------------ src/auth.rs | 8 ++- src/main.rs | 2 +- src/state.rs | 40 +++++++++++++- 8 files changed, 268 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c33e307..6244ee0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -212,6 +212,26 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bindgen" +version = "0.69.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a00dc851838a2120612785d195287475a3ac45514741da670b735818822129a0" +dependencies = [ + "bitflags 2.5.0", + "cexpr", + "clang-sys", + "itertools", + "lazy_static", + "lazycell", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn 2.0.52", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -263,11 +283,35 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +[[package]] +name = "bzip2-sys" +version = "0.1.11+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "cc" version = "1.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5" +dependencies = [ + "jobserver", + "libc", +] + +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] [[package]] name = "cfg-if" @@ -275,6 +319,17 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "clang-sys" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67523a3b4be3ce1989d607a828d036249522dd9c1c8de7f4dd2dae43a37369d1" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -589,6 +644,12 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "h2" version = "0.4.3" @@ -816,6 +877,15 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" +[[package]] +name = "jobserver" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2b099aaa34a9751c5bf0878add70444e1ed2dd73f347be99003d4577277de6e" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.69" @@ -849,18 +919,51 @@ dependencies = [ "spin 0.5.2", ] +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +[[package]] +name = "libloading" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" +dependencies = [ + "cfg-if", + "windows-targets 0.52.4", +] + [[package]] name = "libm" version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "librocksdb-sys" +version = "0.16.0+8.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce3d60bc059831dc1c83903fb45c103f75db65c5a7bf22272764d9cc683e348c" +dependencies = [ + "bindgen", + "bzip2-sys", + "cc", + "glob", + "libc", + "libz-sys", + "lz4-sys", + "pkg-config", + "zstd-sys", +] + [[package]] name = "libsqlite3-sys" version = "0.27.0" @@ -872,6 +975,17 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "libz-sys" +version = "1.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e143b5e666b2695d28f6bca6497720813f699c9602dd7f5cac91008b8ada7f9" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.4.13" @@ -894,6 +1008,16 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "lz4-sys" +version = "1.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "main_error" version = "0.1.2" @@ -1342,6 +1466,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rocksdb" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd13e55d6d7b8cd0ea569161127567cd587676c99f4472f779a0279aa60a7a7" +dependencies = [ + "libc", + "librocksdb-sys", +] + [[package]] name = "rsa" version = "0.9.6" @@ -1368,6 +1502,12 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustix" version = "0.38.32" @@ -1505,6 +1645,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -2205,6 +2351,7 @@ dependencies = [ "mime", "once_cell", "pgtemp", + "rocksdb", "serde", "serde_json", "sqlx", @@ -2535,3 +2682,13 @@ name = "zeroize" version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d" + +[[package]] +name = "zstd-sys" +version = "2.0.10+zstd.1.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c253a4914af5bafc8fa8c86ee400827e83cf6ec01195ec1f1ed8441bf00d65aa" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/Cargo.toml b/Cargo.toml index 37ef445..a69a4ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ email_address = "0.2.4" jsonwebtoken = "9.3.0" main_error = "0.1.2" once_cell = "1.19.0" +rocksdb = { version = "0.22.0", features = ["multi-threaded-cf", "io-uring"] } serde = { version = "1.0.197", features = ["derive"] } serde_json = "1.0.114" sqlx = { version = "0.7.3", features = ["postgres", "runtime-tokio", "uuid", "time"] } diff --git a/src/api.rs b/src/api.rs index a35fba5..0d3f446 100644 --- a/src/api.rs +++ b/src/api.rs @@ -40,7 +40,9 @@ mod tests { async fn test_healthcheck_ok(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let request = Request::builder().uri("/healthcheck").body(Body::empty())?; @@ -55,7 +57,9 @@ mod tests { async fn test_fallback_not_found(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let request = Request::builder() .uri("/does-not-exist") diff --git a/src/api/account.rs b/src/api/account.rs index bae7c54..4fb2675 100644 --- a/src/api/account.rs +++ b/src/api/account.rs @@ -97,7 +97,9 @@ mod tests { async fn test_login_ok(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let auth = Authorization::basic(USER_EMAIL, USER_PASSWORD); @@ -141,7 +143,9 @@ mod tests { async fn test_login_unauthorized(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let auth = Authorization::basic(USER_EMAIL, "hunter2"); @@ -162,7 +166,9 @@ mod tests { async fn test_login_not_found(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let auth = Authorization::basic(USER_EMAIL, USER_PASSWORD); @@ -183,7 +189,9 @@ mod tests { async fn test_logout_ok(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let request = Request::builder() .uri("/logout") diff --git a/src/api/users.rs b/src/api/users.rs index e07bf7e..24bcf97 100644 --- a/src/api/users.rs +++ b/src/api/users.rs @@ -13,7 +13,7 @@ use uuid::Uuid; use crate::{ auth::{credentials::Credential, AccessClaims}, - state::AppState, + state::{AppState, KVStore}, }; use super::error::Error; @@ -40,54 +40,53 @@ pub struct Registration { } pub async fn create( - State(pool): State, + State(state): State, Json(Registration { name, email, password, }): Json, -) -> impl IntoResponse { +) -> Result { email_address::EmailAddress::from_str(&email)?; - let exists: Option = sqlx::query_scalar!( - "SELECT EXISTS(SELECT 1 FROM user_ WHERE email = $1 LIMIT 1)", - email.to_ascii_lowercase() - ) - .fetch_one(&pool) - .await?; - - if exists.is_some_and(|b| b) { - return Err(Error::EmailExists); - } - // TODO: Move this into a micro service, possibly behind a feature flag. let (status, (access, refresh)) = - crate::auth::credentials::create(State(pool.clone()), Json(Credential { password })) + crate::auth::credentials::create(State(state.pool.clone()), Json(Credential { password })) .await?; - let user = sqlx::query_as!( - User, - "INSERT INTO user_ (id,name,email) VALUES ($1, $2, $3) RETURNING *", - refresh.sub, + let user = User { + id: refresh.sub, name, - email.to_ascii_lowercase(), - ) - .fetch_one(&pool) - .await?; + email, + created_at: OffsetDateTime::now_utc(), + updated_at: OffsetDateTime::now_utc(), + }; + + let tx = state.kv_store.transaction(); + tx.put(refresh.sub, serde_json::to_vec(&user).unwrap()) + .unwrap(); Ok((status, access, refresh, Json(user))) } pub async fn show( Path(uuid): Path, - State(pool): State, + State(state): State, _: AccessClaims, -) -> Result { - sqlx::query_as!(User, "SELECT * FROM user_ WHERE id = $1 LIMIT 1", uuid) - .fetch_optional(&pool) - .await? +) -> Result, Error> { + let tx = state.kv_store.transaction(); + Ok(tx + .get(uuid) + .unwrap() .ok_or_else(|| Error::UserNotFound) + .map(|s| serde_json::from_slice(&s))? .map(Json) + .unwrap()) + //sqlx::query_as!(User, "SELECT * FROM user_ WHERE id = $1 LIMIT 1", uuid) + // .fetch_optional(&pool) + // .await? + // .ok_or_else(|| Error::UserNotFound) + // .map(Json) } #[cfg(test)] @@ -120,7 +119,9 @@ mod tests { async fn test_get_ok_self(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let request = Request::builder() .uri(format!("/users/{}", USER_ID)) @@ -150,7 +151,9 @@ mod tests { async fn test_get_ok_other(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let request = Request::builder() .uri(format!("/users/{}", USER_ID)) @@ -182,7 +185,9 @@ mod tests { async fn test_get_not_found(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let request = Request::builder() .uri(format!("/users/{}", USER_ID)) @@ -203,7 +208,9 @@ mod tests { async fn test_get_unauthorized_invalid_token_format(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let request = Request::builder() .uri(format!("/users/{}", USER_ID)) @@ -221,7 +228,9 @@ mod tests { async fn test_get_unauthorized_missing_token(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let request = Request::builder() .uri(format!("/users/{}", USER_ID)) @@ -238,7 +247,9 @@ mod tests { async fn test_post_created(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let user = serde_json::json!( { "name": USER_NAME, @@ -269,7 +280,9 @@ mod tests { async fn test_post_conflict(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let user = serde_json::json!( { "name": USER_NAME, diff --git a/src/auth.rs b/src/auth.rs index 909534e..3c40661 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -94,7 +94,9 @@ mod tests { async fn test_issue_ok(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let auth = Authorization::basic( "4c14f795-86f0-4361-a02f-0edb966fb145", @@ -118,7 +120,9 @@ mod tests { async fn test_issue_unauthorized(pool: PgPool) -> TestResult { setup_test_env(); - let router = Router::new().merge(router()).with_state(AppState { pool }); + let router = Router::new() + .merge(router()) + .with_state(AppState::with_pool(pool, "./rocks.db")); let auth = Authorization::basic("4c14f795-86f0-4361-a02f-0edb966fb145", "hunter2"); diff --git a/src/main.rs b/src/main.rs index 71e68f9..31fa7da 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,7 +21,7 @@ async fn main() -> Result<(), main_error::MainError> { let config = Config::builder().file()?.env().build()?; let listener = TcpListener::bind(config.listen_addr).await?; - let state = AppState::new(config.database_url).await?; + let state = AppState::new(config.database_url, "./rocks.db").await?; let router = unnamed_server::router().with_state(state); tracing::info!("Listening on http://{}", listener.local_addr()?); diff --git a/src/state.rs b/src/state.rs index 771647d..f7fe10e 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,17 +1,40 @@ -use std::fmt::Debug; +use std::{fmt::Debug, path::Path, sync::Arc}; use axum::extract::FromRef; use sqlx::PgPool; use crate::Error; +pub type KVStore = rocksdb::OptimisticTransactionDB; + #[derive(Debug, Clone, FromRef)] pub struct AppState { pub pool: PgPool, + pub kv_store: Arc>, } impl AppState { - pub async fn new(uri: String) -> Result { + pub async fn new

(uri: String, path: P) -> Result + where + P: AsRef, + { + Ok(Self { + pool: Self::init_pool(uri).await?, + kv_store: Arc::new(Self::init_kv_store(path).unwrap()), + }) + } + + pub fn with_pool

(pool: PgPool, path: P) -> Self + where + P: AsRef, + { + Self { + pool, + kv_store: Arc::new(Self::init_kv_store(path).unwrap()), + } + } + + pub async fn init_pool(uri: String) -> Result { tracing::debug!("Attempting to connect to database..."); let pool = sqlx::postgres::PgPoolOptions::new() @@ -23,6 +46,17 @@ impl AppState { sqlx::migrate!().run(&pool).await?; - Ok(Self { pool }) + Ok(pool) + } + + pub fn init_kv_store

( + path: P, + ) -> Result, rocksdb::Error> + where + P: AsRef, + { + let mut opts = rocksdb::Options::default(); + opts.create_if_missing(true); + rocksdb::OptimisticTransactionDB::open(&opts, path) } } -- cgit v1.2.3-70-g09d2