From 33ea8cd5c9e815b826d9ffdb7f4c601828dbea4e Mon Sep 17 00:00:00 2001 From: Jai A Date: Mon, 9 Oct 2023 22:48:32 -0700 Subject: [PATCH] Switch to trolley for payments --- Cargo.lock | 11 ++- Cargo.toml | 2 +- migrations/20230919183129_trolley.sql | 5 +- src/auth/flows.rs | 83 +++++++++++++++- src/auth/validate.rs | 8 +- src/database/models/user_item.rs | 12 ++- src/models/pats.rs | 2 +- src/models/teams.rs | 2 +- src/models/users.rs | 90 ++++++++++++++++- src/queue/payouts.rs | 17 ++-- src/routes/v2/admin.rs | 137 +++++++++++++++++++++++--- src/routes/v2/users.rs | 60 +++++------ 12 files changed, 363 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cb780278..b1eaad62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -597,9 +597,12 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.3.3" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630be753d4e58660abd17930c71b647fe46c27ea6b63cc59e1e3851406972e42" +checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" +dependencies = [ + "serde", +] [[package]] name = "bitvec" @@ -2237,7 +2240,7 @@ dependencies = [ "argon2", "async-trait", "base64 0.21.2", - "bitflags 1.3.2", + "bitflags 2.4.0", "bytes", "censor", "chrono", @@ -3596,7 +3599,7 @@ version = "0.38.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac5ffa1efe7548069688cd7028f32591853cd7b5b756d41bcffd2353e4fc75b4" dependencies = [ - "bitflags 2.3.3", + "bitflags 2.4.0", "errno", "libc", "linux-raw-sys 0.4.3", diff --git a/Cargo.toml b/Cargo.toml index 755adc38..d413c6b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,7 @@ sha1 = { version = "0.6.1", features = ["std"] } sha2 = "0.9.9" hmac = "0.11.0" argon2 = { version = "0.5.0", features = ["std"] } -bitflags = "1.3.2" +bitflags = { version = "2.4.0", features = ["serde"] } hex = "0.4.3" zxcvbn = "2.2.2" totp-rs = { version = "5.0.2", features = ["gen_secret"] } diff --git a/migrations/20230919183129_trolley.sql b/migrations/20230919183129_trolley.sql index 57b41b82..8dfb148f 100644 --- a/migrations/20230919183129_trolley.sql +++ b/migrations/20230919183129_trolley.sql @@ -10,4 +10,7 @@ ALTER TABLE users ALTER TABLE historical_payouts ADD COLUMN batch_id text NULL, - ADD COLUMN payment_id text NULL; \ No newline at end of file + ADD COLUMN payment_id text NULL; + +UPDATE historical_payouts +SET status = 'processed' \ No newline at end of file diff --git a/src/auth/flows.rs b/src/auth/flows.rs index 86c1fa50..c8128eef 100644 --- a/src/auth/flows.rs +++ b/src/auth/flows.rs @@ -7,8 +7,9 @@ use crate::file_hosting::FileHost; use crate::models::ids::base62_impl::{parse_base62, to_base62}; use crate::models::ids::random_base62_rng; use crate::models::pats::Scopes; -use crate::models::users::{Badges, Role}; +use crate::models::users::{Badges, RecipientStatus, Role}; use crate::parse_strings_from_var; +use crate::queue::payouts::{AccountUser, PayoutsQueue}; use crate::queue::session::AuthQueue; use crate::queue::socket::ActiveSockets; use crate::routes::ApiError; @@ -29,7 +30,7 @@ use serde::{Deserialize, Serialize}; use sqlx::postgres::PgPool; use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; use validator::Validate; pub fn config(cfg: &mut ServiceConfig) { @@ -50,7 +51,8 @@ pub fn config(cfg: &mut ServiceConfig) { .service(resend_verify_email) .service(set_email) .service(verify_email) - .service(subscribe_newsletter), + .service(subscribe_newsletter) + .service(link_trolley), ); } @@ -224,6 +226,8 @@ impl TempUser { role: Role::Developer.to_string(), badges: Badges::default(), balance: Decimal::ZERO, + trolley_id: None, + trolley_account_status: None, } .insert(transaction) .await?; @@ -1009,7 +1013,7 @@ pub async fn auth_callback( let sockets = active_sockets.clone(); let state = state_string.clone(); - let res: Result = (|| async move { + let res: Result = async move { let flow = Flow::get(&state, &redis).await?; @@ -1171,7 +1175,7 @@ pub async fn auth_callback( } else { Err::(AuthenticationError::InvalidCredentials) } - })().await; + }.await; // Because this is callback route, if we have an error, we need to ensure we close the original socket if it exists if let Err(ref e) = res { @@ -1381,6 +1385,8 @@ pub async fn create_account_with_password( role: Role::Developer.to_string(), badges: Badges::default(), balance: Decimal::ZERO, + trolley_id: None, + trolley_account_status: None, } .insert(&mut transaction) .await?; @@ -2004,6 +2010,7 @@ pub async fn set_email( redis: Data, email: web::Json, session_queue: Data, + payouts_queue: Data>, ) -> Result { email .0 @@ -2057,6 +2064,15 @@ pub async fn set_email( "We need to verify your email address.", )?; + if let Some(payout_data) = user.payout_data { + if let Some(trolley_id) = payout_data.trolley_id { + let queue = payouts_queue.lock().await; + queue + .update_recipient_email(&trolley_id, &email.email) + .await?; + } + } + crate::database::models::User::clear_caches(&[(user.id.into(), None)], &redis).await?; transaction.commit().await?; @@ -2199,3 +2215,60 @@ fn send_email_verify( Some(("Verify email", &format!("{}/{}?flow={}", dotenvy::var("SITE_URL")?, dotenvy::var("SITE_VERIFY_EMAIL_PATH")?, flow))), ) } + +#[post("trolley/link")] +pub async fn link_trolley( + req: HttpRequest, + pool: Data, + redis: Data, + session_queue: Data, + payouts_queue: Data>, + body: web::Json, +) -> Result { + let user = get_user_from_headers( + &req, + &**pool, + &redis, + &session_queue, + Some(&[Scopes::PAYOUTS_WRITE]), + ) + .await? + .1; + + if let Some(payout_data) = user.payout_data { + if payout_data.trolley_id.is_some() { + return Err(ApiError::InvalidInput( + "User already has a trolley account.".to_string(), + )); + } + } + + if let Some(email) = user.email { + let payouts = payouts_queue.lock().await; + let id = payouts.register_recipient(&email, body.0).await?; + + let mut transaction = pool.begin().await?; + + sqlx::query!( + " + UPDATE users + SET trolley_id = $1, trolley_account_status = $2 + WHERE id = $3 + ", + id, + RecipientStatus::Incomplete.as_str(), + user.id.0 as i64, + ) + .execute(&mut transaction) + .await?; + + transaction.commit().await?; + crate::database::models::User::clear_caches(&[(user.id.into(), None)], &redis).await?; + + Ok(HttpResponse::NoContent().finish()) + } else { + Err(ApiError::InvalidInput( + "User needs to have an email set on account.".to_string(), + )) + } +} diff --git a/src/auth/validate.rs b/src/auth/validate.rs index dfa1854c..58e01e3a 100644 --- a/src/auth/validate.rs +++ b/src/auth/validate.rs @@ -3,7 +3,7 @@ use crate::auth::session::get_session_metadata; use crate::auth::AuthenticationError; use crate::database::models::user_item; use crate::models::pats::Scopes; -use crate::models::users::{Role, User, UserId}; +use crate::models::users::{Role, User, UserId, UserPayoutData}; use crate::queue::session::AuthQueue; use actix_web::HttpRequest; use chrono::Utc; @@ -60,7 +60,11 @@ where has_password: Some(db_user.password.is_some()), has_totp: Some(db_user.totp_secret.is_some()), github_id: None, - payout_data: None, + payout_data: Some(UserPayoutData { + balance: db_user.balance, + trolley_id: db_user.trolley_id, + trolley_status: db_user.trolley_account_status, + }), }; if let Some(required_scopes) = required_scopes { diff --git a/src/database/models/user_item.rs b/src/database/models/user_item.rs index 9e5846a7..d492fe41 100644 --- a/src/database/models/user_item.rs +++ b/src/database/models/user_item.rs @@ -1,7 +1,7 @@ use super::ids::{ProjectId, UserId}; use crate::database::models::DatabaseError; use crate::models::ids::base62_impl::{parse_base62, to_base62}; -use crate::models::users::Badges; +use crate::models::users::{Badges, RecipientStatus}; use chrono::{DateTime, Utc}; use redis::cmd; use rust_decimal::Decimal; @@ -35,7 +35,10 @@ pub struct User { pub created: DateTime, pub role: String, pub badges: Badges, + pub balance: Decimal, + pub trolley_id: Option, + pub trolley_account_status: Option, } impl User { @@ -205,7 +208,7 @@ impl User { created, role, badges, balance, github_id, discord_id, gitlab_id, google_id, steam_id, microsoft_id, - email_verified, password, totp_secret + email_verified, password, totp_secret, trolley_id, trolley_account_status FROM users WHERE id = ANY($1) OR LOWER(username) = ANY($2) ", @@ -237,6 +240,11 @@ impl User { balance: u.balance, password: u.password, totp_secret: u.totp_secret, + trolley_id: u.trolley_id, + trolley_account_status: u + .trolley_account_status + .as_ref() + .map(|x| RecipientStatus::from_string(x)), })) }) .try_collect::>() diff --git a/src/models/pats.rs b/src/models/pats.rs index 429a64f7..09d965ef 100644 --- a/src/models/pats.rs +++ b/src/models/pats.rs @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; pub struct PatId(pub u64); bitflags::bitflags! { - #[derive(Serialize, Deserialize)] + #[derive(Serialize, Deserialize, Copy, Clone)] #[serde(transparent)] pub struct Scopes: u64 { // read a user's email diff --git a/src/models/teams.rs b/src/models/teams.rs index 346e2505..67d86691 100644 --- a/src/models/teams.rs +++ b/src/models/teams.rs @@ -22,7 +22,7 @@ pub struct Team { } bitflags::bitflags! { - #[derive(Serialize, Deserialize)] + #[derive(Serialize, Deserialize, Copy, Clone)] #[serde(transparent)] pub struct Permissions: u64 { const UPLOAD_VERSION = 1 << 0; diff --git a/src/models/users.rs b/src/models/users.rs index 5cdd7232..15e2b6ed 100644 --- a/src/models/users.rs +++ b/src/models/users.rs @@ -12,7 +12,7 @@ pub struct UserId(pub u64); pub const DELETED_USER: UserId = UserId(127155982985829); bitflags::bitflags! { - #[derive(Serialize, Deserialize)] + #[derive(Serialize, Deserialize, Copy, Clone, Debug)] #[serde(transparent)] pub struct Badges: u64 { // 1 << 0 unused - ignore + replace with something later @@ -61,7 +61,7 @@ pub struct User { pub struct UserPayoutData { pub balance: Decimal, pub trolley_id: Option, - pub trolley_status: Option, + pub trolley_status: Option, } use crate::database::models::user_item::User as DBUser; @@ -132,3 +132,89 @@ impl Role { } } } + +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] +#[serde(rename_all = "lowercase")] +pub enum RecipientStatus { + Active, + Incomplete, + Disabled, + Archived, + Suspended, + Blocked, +} + +impl RecipientStatus { + pub fn from_string(string: &str) -> RecipientStatus { + match string { + "active" => RecipientStatus::Active, + "incomplete" => RecipientStatus::Incomplete, + "disabled" => RecipientStatus::Disabled, + "archived" => RecipientStatus::Archived, + "suspended" => RecipientStatus::Suspended, + "blocked" => RecipientStatus::Blocked, + _ => RecipientStatus::Disabled, + } + } + + pub fn as_str(&self) -> &'static str { + match self { + RecipientStatus::Active => "active", + RecipientStatus::Incomplete => "incomplete", + RecipientStatus::Disabled => "disabled", + RecipientStatus::Archived => "archived", + RecipientStatus::Suspended => "suspended", + RecipientStatus::Blocked => "blocked", + } + } +} + +#[derive(Serialize)] +pub struct Payout { + pub created: DateTime, + pub amount: Decimal, + pub status: PayoutStatus, +} + +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)] +#[serde(rename_all = "lowercase")] +pub enum PayoutStatus { + Pending, + Failed, + Processed, + Returned, + Processing, +} + +impl PayoutStatus { + pub fn from_string(string: &str) -> PayoutStatus { + match string { + "pending" => PayoutStatus::Pending, + "failed" => PayoutStatus::Failed, + "processed" => PayoutStatus::Processed, + "returned" => PayoutStatus::Returned, + "processing" => PayoutStatus::Processing, + _ => PayoutStatus::Processing, + } + } + + pub fn as_str(&self) -> &'static str { + match self { + PayoutStatus::Pending => "pending", + PayoutStatus::Failed => "failed", + PayoutStatus::Processed => "processed", + PayoutStatus::Returned => "returned", + PayoutStatus::Processing => "processing", + } + } + + pub fn is_failed(&self) -> bool { + match self { + PayoutStatus::Pending => false, + PayoutStatus::Failed => true, + PayoutStatus::Processed => false, + PayoutStatus::Returned => true, + PayoutStatus::Processing => false, + } + } +} diff --git a/src/queue/payouts.rs b/src/queue/payouts.rs index caf48245..f0c3925e 100644 --- a/src/queue/payouts.rs +++ b/src/queue/payouts.rs @@ -1,14 +1,11 @@ use crate::models::projects::MonetizationStatus; use crate::routes::ApiError; use crate::util::env::parse_var; -use actix::Recipient; -use base64::Engine; use chrono::{DateTime, Datelike, Duration, Utc, Weekday}; use hex::ToHex; use hmac::{Hmac, Mac, NewMac}; use reqwest::Method; use rust_decimal::Decimal; -use s3::creds::time::macros::time; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; @@ -21,6 +18,8 @@ pub struct PayoutsQueue { secret_key: String, } +#[derive(Clone, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] pub enum AccountUser { Business { name: String }, Individual { first: String, last: String }, @@ -84,8 +83,10 @@ impl PayoutsQueue { ApiError::Payments("could not retrieve Trolley response body".to_string()) })?; + println!("{}", serde_json::to_string(&value)?); + if let Some(obj) = value.as_object() { - if !obj.get("ok").map(|x| x.as_bool()).flatten().unwrap_or(true) { + if !obj.get("ok").and_then(|x| x.as_bool()).unwrap_or(true) { #[derive(Deserialize)] struct TrolleyError { field: Option, @@ -109,11 +110,11 @@ impl PayoutsQueue { })); } } - } - return Err(ApiError::Payments( - "could not retrieve Trolley error body".to_string(), - )); + return Err(ApiError::Payments( + "could not retrieve Trolley error body".to_string(), + )); + } } Ok(serde_json::from_value(value)?) diff --git a/src/routes/v2/admin.rs b/src/routes/v2/admin.rs index ffb5156e..21b2ec68 100644 --- a/src/routes/v2/admin.rs +++ b/src/routes/v2/admin.rs @@ -1,7 +1,9 @@ use crate::auth::validate::get_user_record_from_bearer_token; +use crate::database::models::User; use crate::models::analytics::Download; use crate::models::ids::ProjectId; use crate::models::pats::Scopes; +use crate::models::users::{PayoutStatus, RecipientStatus}; use crate::queue::analytics::AnalyticsQueue; use crate::queue::maxmind::MaxMindIndexer; use crate::queue::session::AuthQueue; @@ -171,7 +173,7 @@ pub async fn trolley_webhook( if let Some(signature) = req.headers().get("X-PaymentRails-Signature") { let payload = read_from_payload( &mut payload, - 1 * (1 << 20), + 1 << 20, "Webhook payload exceeds the maximum of 1MiB.", ) .await?; @@ -179,11 +181,11 @@ pub async fn trolley_webhook( let mut signature = signature.to_str().ok().unwrap_or_default().split(','); let timestamp = signature .next() - .and_then(|x| x.split('=').skip(1).next()) + .and_then(|x| x.split('=').nth(1)) .unwrap_or_default(); let v1 = signature .next() - .and_then(|x| x.split('=').skip(1).next()) + .and_then(|x| x.split('=').nth(1)) .unwrap_or_default(); let mut mac: Hmac = @@ -196,19 +198,132 @@ pub async fn trolley_webhook( if &*request_signature == v1 { let webhook = serde_json::from_slice::(&payload)?; + println!( + "webhook: {} {} {:?}", + webhook.action, webhook.model, webhook.body + ); + + println!("{}", serde_json::to_string(&webhook.body)?); + if webhook.model == "recipient" { - // todo: update email + recipient status + #[derive(Deserialize)] + struct Recipient { + pub id: String, + pub email: Option, + pub status: Option, + } + + if let Some(body) = webhook.body.get("recipient") { + if let Ok(recipient) = serde_json::from_value::(body.clone()) { + let value = sqlx::query!( + "SELECT id FROM users WHERE trolley_id = $1", + recipient.id + ) + .fetch_optional(&**pool) + .await?; + + if let Some(user) = value { + let user = User::get_id( + crate::database::models::UserId(user.id), + &**pool, + &redis, + ) + .await?; + + if let Some(user) = user { + let mut transaction = pool.begin().await?; + + if webhook.action == "deleted" { + sqlx::query!( + " + UPDATE users + SET trolley_account_status = NULL, trolley_id = NULL + WHERE id = $1 + ", + user.id.0 + ) + .execute(&mut transaction) + .await?; + } else { + sqlx::query!( + " + UPDATE users + SET email = $1, email_verified = $2, trolley_account_status = $3 + WHERE id = $4 + ", + recipient.email.clone(), + user.email_verified && recipient.email == user.email, + recipient.status.map(|x| x.as_str()), + user.id.0 + ) + .execute(&mut transaction).await?; + } + + transaction.commit().await?; + User::clear_caches(&[(user.id, None)], &redis).await?; + } + } + } + } } if webhook.model == "payment" { - // todo: update payment status - // if new payment status is failed/returned, return money to modrinth balance - } + #[derive(Deserialize)] + struct Payment { + pub id: String, + pub status: PayoutStatus, + } - println!( - "webhook: {} {} {:?}", - webhook.action, webhook.model, webhook.body - ); + if let Some(body) = webhook.body.get("payment") { + if let Ok(payment) = serde_json::from_value::(body.clone()) { + let value = sqlx::query!( + "SELECT id, amount, user_id, status FROM historical_payouts WHERE payment_id = $1", + payment.id + ) + .fetch_optional(&**pool) + .await?; + + if let Some(payout) = value { + let mut transaction = pool.begin().await?; + + if payment.status.is_failed() + && !PayoutStatus::from_string(&payout.status).is_failed() + { + sqlx::query!( + " + UPDATE users + SET balance = balance + $1 + WHERE id = $2 + ", + payout.amount, + payout.user_id, + ) + .execute(&mut transaction) + .await?; + } + + sqlx::query!( + " + UPDATE historical_payouts + SET status = $1 + WHERE payment_id = $2 + ", + payment.status.as_str(), + payment.id, + ) + .execute(&mut transaction) + .await?; + + transaction.commit().await?; + User::clear_caches( + &[(crate::database::models::UserId(payout.user_id), None)], + &redis, + ) + .await?; + } + } + } + } } } diff --git a/src/routes/v2/users.rs b/src/routes/v2/users.rs index c025a88b..d652562f 100644 --- a/src/routes/v2/users.rs +++ b/src/routes/v2/users.rs @@ -1,17 +1,16 @@ -use crate::auth::{get_user_from_headers, AuthenticationError}; +use crate::auth::get_user_from_headers; use crate::database::models::User; use crate::file_hosting::FileHost; use crate::models::notifications::Notification; use crate::models::pats::Scopes; use crate::models::projects::Project; -use crate::models::users::{Badges, Role, UserId}; +use crate::models::users::{Badges, Payout, PayoutStatus, RecipientStatus, Role, UserId}; use crate::queue::payouts::PayoutsQueue; use crate::queue::session::AuthQueue; use crate::routes::ApiError; use crate::util::routes::read_from_payload; use crate::util::validate::validation_errors_to_string; use actix_web::{delete, get, patch, post, web, HttpRequest, HttpResponse}; -use chrono::{DateTime, Utc}; use lazy_static::lazy_static; use regex::Regex; use rust_decimal::Decimal; @@ -192,7 +191,7 @@ pub async fn user_edit( redis: web::Data, session_queue: web::Data, ) -> Result { - let (scopes, user) = get_user_from_headers( + let (_scopes, user) = get_user_from_headers( &req, &**pool, &redis, @@ -566,13 +565,6 @@ pub async fn user_notifications( } } -#[derive(Serialize)] -pub struct Payout { - pub created: DateTime, - pub amount: Decimal, - pub status: String, -} - #[get("{id}/payouts")] pub async fn user_payouts( req: HttpRequest, @@ -632,7 +624,7 @@ pub async fn user_payouts( Ok(e.right().map(|row| Payout { created: row.created, amount: row.amount, - status: row.status, + status: PayoutStatus::from_string(&row.status), })) }) .try_collect::>(), @@ -689,28 +681,39 @@ pub async fn user_payouts_request( if let Some(payouts_data) = user.payout_data { if let Some(trolley_id) = payouts_data.trolley_id { if let Some(trolley_status) = payouts_data.trolley_status { - if trolley_status == "active" { + if trolley_status == RecipientStatus::Active { return if data.amount < payouts_data.balance { let mut transaction = pool.begin().await?; - let (batch_id, payment_id) = payouts_queue - .send_payout(&trolley_id, data.amount) - .await?; + let (batch_id, payment_id) = + payouts_queue.send_payout(&trolley_id, data.amount).await?; sqlx::query!( - " - INSERT INTO historical_payouts (user_id, amount, status, batch_id, payment_id) - VALUES ($1, $2, $3, $4, $5) - ", - id as crate::database::models::ids::UserId, - data.amount, - "processing", - batch_id, - payment_id, - ) + " + INSERT INTO historical_payouts (user_id, amount, status, batch_id, payment_id) + VALUES ($1, $2, $3, $4, $5) + ", + id as crate::database::models::ids::UserId, + data.amount, + "processing", + batch_id, + payment_id, + ) .execute(&mut *transaction) .await?; + sqlx::query!( + " + UPDATE users + SET balance = balance - $1 + WHERE id = $2 + ", + data.amount, + id as crate::database::models::ids::UserId + ) + .execute(&mut *transaction) + .await?; + User::clear_caches(&[(id, None)], &redis).await?; transaction.commit().await?; @@ -723,8 +726,9 @@ pub async fn user_payouts_request( }; } else { return Err(ApiError::InvalidInput( - "Please complete payout information via the trolley dashboard!".to_string(), - )) + "Please complete payout information via the trolley dashboard!" + .to_string(), + )); } } }