diff --git a/.sqlx/query-67d494c0b8818b3df09d091400626271156754cabcaad1df7c0d9576b3273a6c.json b/.sqlx/query-67d494c0b8818b3df09d091400626271156754cabcaad1df7c0d9576b3273a6c.json new file mode 100644 index 00000000..c44c773d --- /dev/null +++ b/.sqlx/query-67d494c0b8818b3df09d091400626271156754cabcaad1df7c0d9576b3273a6c.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT balance FROM users WHERE id = $1 FOR UPDATE\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "balance", + "type_info": "Numeric" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "67d494c0b8818b3df09d091400626271156754cabcaad1df7c0d9576b3273a6c" +} diff --git a/.sqlx/query-a911bd1b5d19d305e5dae51941c169cba3afed4b6c7d9b99fc2d6a0db853cc5c.json b/.sqlx/query-a911bd1b5d19d305e5dae51941c169cba3afed4b6c7d9b99fc2d6a0db853cc5c.json new file mode 100644 index 00000000..483cd794 --- /dev/null +++ b/.sqlx/query-a911bd1b5d19d305e5dae51941c169cba3afed4b6c7d9b99fc2d6a0db853cc5c.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT balance FROM users WHERE id = $1 FOR UPDATE\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "balance", + "type_info": "Numeric" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "a911bd1b5d19d305e5dae51941c169cba3afed4b6c7d9b99fc2d6a0db853cc5c" +} diff --git a/src/queue/payouts.rs b/src/queue/payouts.rs index eed9079e..f3570963 100644 --- a/src/queue/payouts.rs +++ b/src/queue/payouts.rs @@ -16,12 +16,11 @@ use serde_json::Value; use sqlx::postgres::PgQueryResult; use sqlx::PgPool; use std::collections::HashMap; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::RwLock; pub struct PayoutsQueue { credential: RwLock>, payout_options: RwLock>, - pub payouts_locks: Mutex<()>, } #[derive(Clone)] @@ -48,7 +47,6 @@ impl PayoutsQueue { PayoutsQueue { credential: RwLock::new(None), payout_options: RwLock::new(None), - payouts_locks: Mutex::new(()), } } diff --git a/src/routes/v3/payouts.rs b/src/routes/v3/payouts.rs index dc713ee2..197d6d84 100644 --- a/src/routes/v3/payouts.rs +++ b/src/routes/v3/payouts.rs @@ -128,7 +128,14 @@ pub async fn paypal_webhook( .await?; if let Some(result) = result { - let _guard = payouts.payouts_locks.lock().await; + sqlx::query!( + " + SELECT balance FROM users WHERE id = $1 FOR UPDATE + ", + result.user_id + ) + .fetch_optional(&mut *transaction) + .await?; sqlx::query!( " @@ -194,7 +201,6 @@ pub async fn tremendous_webhook( req: HttpRequest, pool: web::Data, redis: web::Data, - payouts: web::Data, body: String, ) -> Result { let signature = req @@ -247,7 +253,14 @@ pub async fn tremendous_webhook( .await?; if let Some(result) = result { - let _guard = payouts.payouts_locks.lock().await; + sqlx::query!( + " + SELECT balance FROM users WHERE id = $1 FOR UPDATE + ", + result.user_id + ) + .fetch_optional(&mut *transaction) + .await?; sqlx::query!( " @@ -367,8 +380,6 @@ pub async fn create_payout( )); } - let _guard = payouts_queue.payouts_locks.lock().await; - if user.balance < body.amount || body.amount < Decimal::ZERO { return Err(ApiError::InvalidInput( "You do not have enough funds to make this payout!".to_string(), @@ -398,6 +409,16 @@ pub async fn create_payout( } let mut transaction = pool.begin().await?; + + sqlx::query!( + " + SELECT balance FROM users WHERE id = $1 FOR UPDATE + ", + user.id.0 + ) + .fetch_optional(&mut *transaction) + .await?; + let payout_id = generate_payout_id(&mut transaction).await?; let payout_item = match body.method {