From 92f503eeda5ab1ede90e8714ac66c6ee0da506af Mon Sep 17 00:00:00 2001 From: Jai A Date: Thu, 12 Sep 2024 14:25:27 -0700 Subject: [PATCH 1/3] Integrate with Aditude API for payouts --- .env | 6 +- ...a5b96c8cdfe0a9f9c00e5c67e6b95a33c8c6b.json | 22 ++ ...f7945741f915d4ae657363fe67db46e8bd4cf.json | 22 ++ ...2c6d0c7fb15e375d734bf34c365e71d623780.json | 28 ++ ...5adc8115925320edc35d189bf177ad2b7317a.json | 20 ++ ...6d90c9bb2bd94b6d8accb3d2e0906bb289798.json | 15 - ...2075dd09094b44d2aea2910011eb56778ee0.json} | 36 +- ...26271156754cabcaad1df7c0d9576b3273a6c.json | 22 -- ...68989eced49b475c0bbab90b21908ae0e77b4.json | 15 - ...b0c1930289eb797cf340d961ac69d2c2ceba.json} | 5 +- ...5549f0899689e857634cfc5d85bd7b8718c46.json | 15 - migrations/20240911044738_payouts-updates.sql | 2 + src/auth/validate.rs | 3 +- src/database/models/user_item.rs | 5 - src/lib.rs | 8 +- src/queue/payouts.rs | 178 ++++++---- src/routes/internal/flows.rs | 3 - src/routes/v3/payouts.rs | 311 ++++++++++++++---- 18 files changed, 476 insertions(+), 240 deletions(-) create mode 100644 .sqlx/query-0379424a41b12db94c7734086fca5b96c8cdfe0a9f9c00e5c67e6b95a33c8c6b.json create mode 100644 .sqlx/query-0a31f7b04f4b68c556bdbfe373ef7945741f915d4ae657363fe67db46e8bd4cf.json create mode 100644 .sqlx/query-0bd68c1b7c90ddcdde8c8bbd8362c6d0c7fb15e375d734bf34c365e71d623780.json create mode 100644 .sqlx/query-1280600bf1bf7b4f0d19d0de0ca5adc8115925320edc35d189bf177ad2b7317a.json delete mode 100644 .sqlx/query-3db83286f5aea07f399db451bfd6d90c9bb2bd94b6d8accb3d2e0906bb289798.json rename .sqlx/{query-c4e7adb61382e0422439120f9a6a4388ab4ec25c0d81c2d5809cf011e49d0a6c.json => query-5cce25ecda748f570de563bd3b312075dd09094b44d2aea2910011eb56778ee0.json} (83%) delete mode 100644 .sqlx/query-67d494c0b8818b3df09d091400626271156754cabcaad1df7c0d9576b3273a6c.json delete mode 100644 .sqlx/query-e1c24a57013cbc64f463d3a49cb68989eced49b475c0bbab90b21908ae0e77b4.json rename .sqlx/{query-1c30a8a31b031f194f70dc2a3bac8e131513dd7e9d2c46432ca797f6422c6ecf.json => query-fa5b05775f18d1268bbeece1f5f1b0c1930289eb797cf340d961ac69d2c2ceba.json} (52%) delete mode 100644 .sqlx/query-ff474fba4d18f7788b1a1900a6e5549f0899689e857634cfc5d85bd7b8718c46.json create mode 100644 migrations/20240911044738_payouts-updates.sql diff --git a/.env b/.env index 645b8b18..d24a0d20 100644 --- a/.env +++ b/.env @@ -98,9 +98,9 @@ CLICKHOUSE_DATABASE=staging_ariadne MAXMIND_LICENSE_KEY=none -PAYOUTS_BUDGET=100 - FLAME_ANVIL_URL=none STRIPE_API_KEY=none -STRIPE_WEBHOOK_SECRET=none \ No newline at end of file +STRIPE_WEBHOOK_SECRET=none + +ADITUDE_API_KEY=none \ No newline at end of file diff --git a/.sqlx/query-0379424a41b12db94c7734086fca5b96c8cdfe0a9f9c00e5c67e6b95a33c8c6b.json b/.sqlx/query-0379424a41b12db94c7734086fca5b96c8cdfe0a9f9c00e5c67e6b95a33c8c6b.json new file mode 100644 index 00000000..d3188818 --- /dev/null +++ b/.sqlx/query-0379424a41b12db94c7734086fca5b96c8cdfe0a9f9c00e5c67e6b95a33c8c6b.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT SUM(amount)\n FROM payouts_values\n WHERE user_id = $1 AND date_available > NOW()\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "sum", + "type_info": "Numeric" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + null + ] + }, + "hash": "0379424a41b12db94c7734086fca5b96c8cdfe0a9f9c00e5c67e6b95a33c8c6b" +} diff --git a/.sqlx/query-0a31f7b04f4b68c556bdbfe373ef7945741f915d4ae657363fe67db46e8bd4cf.json b/.sqlx/query-0a31f7b04f4b68c556bdbfe373ef7945741f915d4ae657363fe67db46e8bd4cf.json new file mode 100644 index 00000000..07ef85af --- /dev/null +++ b/.sqlx/query-0a31f7b04f4b68c556bdbfe373ef7945741f915d4ae657363fe67db46e8bd4cf.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT SUM(amount)\n FROM payouts_values\n WHERE user_id = $1 AND date_available <= NOW()\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "sum", + "type_info": "Numeric" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + null + ] + }, + "hash": "0a31f7b04f4b68c556bdbfe373ef7945741f915d4ae657363fe67db46e8bd4cf" +} diff --git a/.sqlx/query-0bd68c1b7c90ddcdde8c8bbd8362c6d0c7fb15e375d734bf34c365e71d623780.json b/.sqlx/query-0bd68c1b7c90ddcdde8c8bbd8362c6d0c7fb15e375d734bf34c365e71d623780.json new file mode 100644 index 00000000..0c3a38d4 --- /dev/null +++ b/.sqlx/query-0bd68c1b7c90ddcdde8c8bbd8362c6d0c7fb15e375d734bf34c365e71d623780.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT SUM(amount) amount, SUM(fee) fee\n FROM payouts\n WHERE user_id = $1 AND (status = 'success' OR status = 'in-transit')\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "amount", + "type_info": "Numeric" + }, + { + "ordinal": 1, + "name": "fee", + "type_info": "Numeric" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + null, + null + ] + }, + "hash": "0bd68c1b7c90ddcdde8c8bbd8362c6d0c7fb15e375d734bf34c365e71d623780" +} diff --git a/.sqlx/query-1280600bf1bf7b4f0d19d0de0ca5adc8115925320edc35d189bf177ad2b7317a.json b/.sqlx/query-1280600bf1bf7b4f0d19d0de0ca5adc8115925320edc35d189bf177ad2b7317a.json new file mode 100644 index 00000000..a4888342 --- /dev/null +++ b/.sqlx/query-1280600bf1bf7b4f0d19d0de0ca5adc8115925320edc35d189bf177ad2b7317a.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT SUM(amount) from payouts_values\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "sum", + "type_info": "Numeric" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + null + ] + }, + "hash": "1280600bf1bf7b4f0d19d0de0ca5adc8115925320edc35d189bf177ad2b7317a" +} diff --git a/.sqlx/query-3db83286f5aea07f399db451bfd6d90c9bb2bd94b6d8accb3d2e0906bb289798.json b/.sqlx/query-3db83286f5aea07f399db451bfd6d90c9bb2bd94b6d8accb3d2e0906bb289798.json deleted file mode 100644 index 795800ff..00000000 --- a/.sqlx/query-3db83286f5aea07f399db451bfd6d90c9bb2bd94b6d8accb3d2e0906bb289798.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE users\n SET balance = balance - $1\n WHERE id = $2\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Numeric", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "3db83286f5aea07f399db451bfd6d90c9bb2bd94b6d8accb3d2e0906bb289798" -} diff --git a/.sqlx/query-c4e7adb61382e0422439120f9a6a4388ab4ec25c0d81c2d5809cf011e49d0a6c.json b/.sqlx/query-5cce25ecda748f570de563bd3b312075dd09094b44d2aea2910011eb56778ee0.json similarity index 83% rename from .sqlx/query-c4e7adb61382e0422439120f9a6a4388ab4ec25c0d81c2d5809cf011e49d0a6c.json rename to .sqlx/query-5cce25ecda748f570de563bd3b312075dd09094b44d2aea2910011eb56778ee0.json index 3cd8992b..b97a6358 100644 --- a/.sqlx/query-c4e7adb61382e0422439120f9a6a4388ab4ec25c0d81c2d5809cf011e49d0a6c.json +++ b/.sqlx/query-5cce25ecda748f570de563bd3b312075dd09094b44d2aea2910011eb56778ee0.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT id, email,\n avatar_url, raw_avatar_url, username, bio,\n created, role, badges,\n balance,\n github_id, discord_id, gitlab_id, google_id, steam_id, microsoft_id,\n email_verified, password, totp_secret, paypal_id, paypal_country, paypal_email,\n venmo_handle, stripe_customer_id\n FROM users\n WHERE id = ANY($1) OR LOWER(username) = ANY($2)\n ", + "query": "\n SELECT id, email,\n avatar_url, raw_avatar_url, username, bio,\n created, role, badges,\n github_id, discord_id, gitlab_id, google_id, steam_id, microsoft_id,\n email_verified, password, totp_secret, paypal_id, paypal_country, paypal_email,\n venmo_handle, stripe_customer_id\n FROM users\n WHERE id = ANY($1) OR LOWER(username) = ANY($2)\n ", "describe": { "columns": [ { @@ -50,76 +50,71 @@ }, { "ordinal": 9, - "name": "balance", - "type_info": "Numeric" - }, - { - "ordinal": 10, "name": "github_id", "type_info": "Int8" }, { - "ordinal": 11, + "ordinal": 10, "name": "discord_id", "type_info": "Int8" }, { - "ordinal": 12, + "ordinal": 11, "name": "gitlab_id", "type_info": "Int8" }, { - "ordinal": 13, + "ordinal": 12, "name": "google_id", "type_info": "Varchar" }, { - "ordinal": 14, + "ordinal": 13, "name": "steam_id", "type_info": "Int8" }, { - "ordinal": 15, + "ordinal": 14, "name": "microsoft_id", "type_info": "Varchar" }, { - "ordinal": 16, + "ordinal": 15, "name": "email_verified", "type_info": "Bool" }, { - "ordinal": 17, + "ordinal": 16, "name": "password", "type_info": "Text" }, { - "ordinal": 18, + "ordinal": 17, "name": "totp_secret", "type_info": "Varchar" }, { - "ordinal": 19, + "ordinal": 18, "name": "paypal_id", "type_info": "Text" }, { - "ordinal": 20, + "ordinal": 19, "name": "paypal_country", "type_info": "Text" }, { - "ordinal": 21, + "ordinal": 20, "name": "paypal_email", "type_info": "Text" }, { - "ordinal": 22, + "ordinal": 21, "name": "venmo_handle", "type_info": "Text" }, { - "ordinal": 23, + "ordinal": 22, "name": "stripe_customer_id", "type_info": "Text" } @@ -140,7 +135,6 @@ false, false, false, - false, true, true, true, @@ -157,5 +151,5 @@ true ] }, - "hash": "c4e7adb61382e0422439120f9a6a4388ab4ec25c0d81c2d5809cf011e49d0a6c" + "hash": "5cce25ecda748f570de563bd3b312075dd09094b44d2aea2910011eb56778ee0" } diff --git a/.sqlx/query-67d494c0b8818b3df09d091400626271156754cabcaad1df7c0d9576b3273a6c.json b/.sqlx/query-67d494c0b8818b3df09d091400626271156754cabcaad1df7c0d9576b3273a6c.json deleted file mode 100644 index c44c773d..00000000 --- a/.sqlx/query-67d494c0b8818b3df09d091400626271156754cabcaad1df7c0d9576b3273a6c.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT balance FROM users WHERE id = $1 FOR UPDATE\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "balance", - "type_info": "Numeric" - } - ], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [ - false - ] - }, - "hash": "67d494c0b8818b3df09d091400626271156754cabcaad1df7c0d9576b3273a6c" -} diff --git a/.sqlx/query-e1c24a57013cbc64f463d3a49cb68989eced49b475c0bbab90b21908ae0e77b4.json b/.sqlx/query-e1c24a57013cbc64f463d3a49cb68989eced49b475c0bbab90b21908ae0e77b4.json deleted file mode 100644 index dc23d4e2..00000000 --- a/.sqlx/query-e1c24a57013cbc64f463d3a49cb68989eced49b475c0bbab90b21908ae0e77b4.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE users u\n SET balance = u.balance + v.amount\n FROM unnest($1::BIGINT[], $2::NUMERIC[]) AS v(id, amount)\n WHERE u.id = v.id\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8Array", - "NumericArray" - ] - }, - "nullable": [] - }, - "hash": "e1c24a57013cbc64f463d3a49cb68989eced49b475c0bbab90b21908ae0e77b4" -} diff --git a/.sqlx/query-1c30a8a31b031f194f70dc2a3bac8e131513dd7e9d2c46432ca797f6422c6ecf.json b/.sqlx/query-fa5b05775f18d1268bbeece1f5f1b0c1930289eb797cf340d961ac69d2c2ceba.json similarity index 52% rename from .sqlx/query-1c30a8a31b031f194f70dc2a3bac8e131513dd7e9d2c46432ca797f6422c6ecf.json rename to .sqlx/query-fa5b05775f18d1268bbeece1f5f1b0c1930289eb797cf340d961ac69d2c2ceba.json index d7f865b0..ff6b3926 100644 --- a/.sqlx/query-1c30a8a31b031f194f70dc2a3bac8e131513dd7e9d2c46432ca797f6422c6ecf.json +++ b/.sqlx/query-fa5b05775f18d1268bbeece1f5f1b0c1930289eb797cf340d961ac69d2c2ceba.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO payouts_values (user_id, mod_id, amount, created)\n SELECT * FROM UNNEST ($1::bigint[], $2::bigint[], $3::numeric[], $4::timestamptz[])\n ", + "query": "\n INSERT INTO payouts_values (user_id, mod_id, amount, created, date_available)\n SELECT * FROM UNNEST ($1::bigint[], $2::bigint[], $3::numeric[], $4::timestamptz[], $5::timestamptz[])\n ", "describe": { "columns": [], "parameters": { @@ -8,10 +8,11 @@ "Int8Array", "Int8Array", "NumericArray", + "TimestamptzArray", "TimestamptzArray" ] }, "nullable": [] }, - "hash": "1c30a8a31b031f194f70dc2a3bac8e131513dd7e9d2c46432ca797f6422c6ecf" + "hash": "fa5b05775f18d1268bbeece1f5f1b0c1930289eb797cf340d961ac69d2c2ceba" } diff --git a/.sqlx/query-ff474fba4d18f7788b1a1900a6e5549f0899689e857634cfc5d85bd7b8718c46.json b/.sqlx/query-ff474fba4d18f7788b1a1900a6e5549f0899689e857634cfc5d85bd7b8718c46.json deleted file mode 100644 index 6b1ae87a..00000000 --- a/.sqlx/query-ff474fba4d18f7788b1a1900a6e5549f0899689e857634cfc5d85bd7b8718c46.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE users\n SET balance = balance + $1\n WHERE id = $2\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Numeric", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "ff474fba4d18f7788b1a1900a6e5549f0899689e857634cfc5d85bd7b8718c46" -} diff --git a/migrations/20240911044738_payouts-updates.sql b/migrations/20240911044738_payouts-updates.sql new file mode 100644 index 00000000..824c8204 --- /dev/null +++ b/migrations/20240911044738_payouts-updates.sql @@ -0,0 +1,2 @@ +ALTER TABLE payouts_values ADD COLUMN date_available timestamptz NOT NULL DEFAULT now(); +ALTER TABLE payouts_values ALTER COLUMN date_available DROP DEFAULT; \ No newline at end of file diff --git a/src/auth/validate.rs b/src/auth/validate.rs index 4f13af33..2037726e 100644 --- a/src/auth/validate.rs +++ b/src/auth/validate.rs @@ -9,6 +9,7 @@ use crate::routes::internal::session::get_session_metadata; use actix_web::http::header::{HeaderValue, AUTHORIZATION}; use actix_web::HttpRequest; use chrono::Utc; +use rust_decimal::Decimal; pub async fn get_user_from_headers<'a, E>( req: &HttpRequest, @@ -66,7 +67,7 @@ where paypal_address: db_user.paypal_email, paypal_country: db_user.paypal_country, venmo_handle: db_user.venmo_handle, - balance: db_user.balance, + balance: Decimal::ZERO, }), stripe_customer_id: db_user.stripe_customer_id, }; diff --git a/src/database/models/user_item.rs b/src/database/models/user_item.rs index a4f1cac7..8a655d1a 100644 --- a/src/database/models/user_item.rs +++ b/src/database/models/user_item.rs @@ -7,7 +7,6 @@ use crate::models::ids::base62_impl::{parse_base62, to_base62}; use crate::models::users::Badges; use chrono::{DateTime, Utc}; use dashmap::DashMap; -use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; use std::fmt::{Debug, Display}; use std::hash::Hash; @@ -45,8 +44,6 @@ pub struct User { pub created: DateTime, pub role: String, pub badges: Badges, - - pub balance: Decimal, } impl User { @@ -169,7 +166,6 @@ impl User { SELECT id, email, avatar_url, raw_avatar_url, username, bio, created, role, badges, - balance, github_id, discord_id, gitlab_id, google_id, steam_id, microsoft_id, email_verified, password, totp_secret, paypal_id, paypal_country, paypal_email, venmo_handle, stripe_customer_id @@ -198,7 +194,6 @@ impl User { created: u.created, role: u.role, badges: Badges::from_bits(u.badges as u64).unwrap_or_default(), - balance: u.balance, password: u.password, paypal_id: u.paypal_id, paypal_country: u.paypal_country, diff --git a/src/lib.rs b/src/lib.rs index 6de882ad..6ba0da60 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -242,16 +242,14 @@ pub fn app_setup( { let pool_ref = pool.clone(); - let redis_ref = redis_pool.clone(); let client_ref = clickhouse.clone(); scheduler.run(std::time::Duration::from_secs(60 * 60 * 6), move || { let pool_ref = pool_ref.clone(); - let redis_ref = redis_ref.clone(); let client_ref = client_ref.clone(); async move { info!("Started running payouts"); - let result = process_payout(&pool_ref, &redis_ref, &client_ref).await; + let result = process_payout(&pool_ref, &client_ref).await; if let Err(e) = result { warn!("Payouts run failed: {:?}", e); } @@ -451,12 +449,12 @@ pub fn check_env_vars() -> bool { failed |= check_var::("MAXMIND_LICENSE_KEY"); - failed |= check_var::("PAYOUTS_BUDGET"); - failed |= check_var::("FLAME_ANVIL_URL"); failed |= check_var::("STRIPE_API_KEY"); failed |= check_var::("STRIPE_WEBHOOK_SECRET"); + failed |= check_var::("ADITUDE_API_KEY"); + failed } diff --git a/src/queue/payouts.rs b/src/queue/payouts.rs index 11716379..e7188c7f 100644 --- a/src/queue/payouts.rs +++ b/src/queue/payouts.rs @@ -1,11 +1,10 @@ use crate::models::payouts::{ PayoutDecimal, PayoutInterval, PayoutMethod, PayoutMethodFee, PayoutMethodType, }; +use crate::models::projects::MonetizationStatus; use crate::routes::ApiError; -use crate::util::env::parse_var; -use crate::{database::redis::RedisPool, models::projects::MonetizationStatus}; use base64::Engine; -use chrono::{DateTime, Datelike, Duration, Utc, Weekday}; +use chrono::{DateTime, Datelike, Duration, TimeZone, Utc}; use dashmap::DashMap; use futures::TryStreamExt; use reqwest::Method; @@ -511,11 +510,55 @@ impl PayoutsQueue { } } -pub async fn process_payout( - pool: &PgPool, - redis: &RedisPool, - client: &clickhouse::Client, -) -> Result<(), ApiError> { +#[derive(Deserialize)] +pub struct AditudePoints { + #[serde(rename = "pointsList")] + pub points_list: Vec, +} + +#[derive(Deserialize)] +pub struct AditudePoint { + pub metric: AditudeMetric, + pub time: AditudeTime, +} + +#[derive(Deserialize)] +pub struct AditudeMetric { + pub revenue: Option, + pub impressions: Option, + pub cpm: Option, +} + +#[derive(Deserialize)] +pub struct AditudeTime { + pub seconds: u64, +} + +pub async fn make_aditude_request( + metrics: &[&str], + range: &str, + interval: &str, +) -> Result, ApiError> { + let request = reqwest::Client::new() + .post("https://cloud.aditude.io/api/public/insights/metrics") + .bearer_auth(&dotenvy::var("ADITUDE_API_KEY")?) + .json(&serde_json::json!({ + "metrics": metrics, + "range": range, + "interval": interval + })) + .send() + .await? + .error_for_status()?; + + let text = request.text().await?; + + let json: Vec = serde_json::from_str(&text)?; + + Ok(json) +} + +pub async fn process_payout(pool: &PgPool, client: &clickhouse::Client) -> Result<(), ApiError> { let start: DateTime = DateTime::from_naive_utc_and_offset( (Utc::now() - Duration::days(1)) .date_naive() @@ -707,24 +750,62 @@ pub async fn process_payout( ); } - let amount = Decimal::from(parse_var::("PAYOUTS_BUDGET").unwrap_or(0)); - - let days = Decimal::from(28); - let weekdays = Decimal::from(20); - let weekend_bonus = Decimal::from(5) / Decimal::from(4); - - let weekday_amount = amount / (weekdays + (weekend_bonus) * (days - weekdays)); - let weekend_amount = weekday_amount * weekend_bonus; + let aditude_res = + make_aditude_request(&["METRIC_IMPRESSIONS", "METRIC_REVENUE"], "Yesterday", "1d").await?; + + let aditude_amount: Decimal = aditude_res + .iter() + .map(|x| { + x.points_list + .iter() + .filter_map(|x| x.metric.revenue) + .sum::() + }) + .sum(); + let aditude_impressions: u128 = aditude_res + .iter() + .map(|x| { + x.points_list + .iter() + .filter_map(|x| x.metric.impressions) + .sum::() + }) + .sum(); + + // Modrinth's share of ad revenue + let modrinth_cut = Decimal::from(1) / Decimal::from(4); + // Clean.io fee (ad antimalware). Per 1000 impressions. + let clean_io_fee = Decimal::from(8) / Decimal::from(1000); + + let net_revenue = + aditude_amount - (clean_io_fee * Decimal::from(aditude_impressions) / Decimal::from(1000)); + + let payout = net_revenue * (Decimal::from(1) - modrinth_cut); + + // Ad payouts are Net 60 from the end of the month + let available = { + let now = Utc::now().date_naive(); + + let year = now.year(); + let month = now.month(); + + // Get the first day of the next month + let last_day_of_month = if month == 12 { + Utc.with_ymd_and_hms(year + 1, 1, 1, 0, 0, 0).unwrap() + } else { + Utc.with_ymd_and_hms(year, month + 1, 1, 0, 0, 0).unwrap() + }; - let payout = match start.weekday() { - Weekday::Sat | Weekday::Sun => weekend_amount, - _ => weekday_amount, + last_day_of_month + Duration::days(59) }; - let mut clear_cache_users = Vec::new(); - let (mut insert_user_ids, mut insert_project_ids, mut insert_payouts, mut insert_starts) = - (Vec::new(), Vec::new(), Vec::new(), Vec::new()); - let mut update_user_balance: HashMap = HashMap::new(); + let ( + mut insert_user_ids, + mut insert_project_ids, + mut insert_payouts, + mut insert_starts, + mut insert_availables, + ) = (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()); for (id, project) in projects_map { if let Some(value) = &multipliers.values.get(&(id as u64)) { let project_multiplier: Decimal = @@ -741,62 +822,29 @@ pub async fn process_payout( insert_project_ids.push(id); insert_payouts.push(payout); insert_starts.push(start); - - *update_user_balance.entry(user_id).or_default() += payout; - - clear_cache_users.push(user_id); + insert_availables.push(available); } } } } } - let (mut update_user_ids, mut update_user_balances) = (Vec::new(), Vec::new()); - - for (user_id, payout) in update_user_balance { - update_user_ids.push(user_id); - update_user_balances.push(payout); - } - - sqlx::query!( - " - UPDATE users u - SET balance = u.balance + v.amount - FROM unnest($1::BIGINT[], $2::NUMERIC[]) AS v(id, amount) - WHERE u.id = v.id - ", - &update_user_ids, - &update_user_balances - ) - .execute(&mut *transaction) - .await?; - sqlx::query!( " - INSERT INTO payouts_values (user_id, mod_id, amount, created) - SELECT * FROM UNNEST ($1::bigint[], $2::bigint[], $3::numeric[], $4::timestamptz[]) + INSERT INTO payouts_values (user_id, mod_id, amount, created, date_available) + SELECT * FROM UNNEST ($1::bigint[], $2::bigint[], $3::numeric[], $4::timestamptz[], $5::timestamptz[]) ", &insert_user_ids[..], &insert_project_ids[..], &insert_payouts[..], - &insert_starts[..] + &insert_starts[..], + &insert_availables[..] ) .execute(&mut *transaction) .await?; transaction.commit().await?; - if !clear_cache_users.is_empty() { - crate::database::models::User::clear_caches( - &clear_cache_users - .into_iter() - .map(|x| (crate::database::models::UserId(x), None)) - .collect::>(), - redis, - ) - .await?; - } - Ok(()) } @@ -806,17 +854,19 @@ pub async fn insert_payouts( insert_project_ids: Vec, insert_payouts: Vec, insert_starts: Vec>, + insert_availables: Vec>, transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>, ) -> sqlx::Result { sqlx::query!( " - INSERT INTO payouts_values (user_id, mod_id, amount, created) - SELECT * FROM UNNEST ($1::bigint[], $2::bigint[], $3::numeric[], $4::timestamptz[]) + INSERT INTO payouts_values (user_id, mod_id, amount, created, date_available) + SELECT * FROM UNNEST ($1::bigint[], $2::bigint[], $3::numeric[], $4::timestamptz[], $5::timestamptz[]) ", &insert_user_ids[..], &insert_project_ids[..], &insert_payouts[..], - &insert_starts[..] + &insert_starts[..], + &insert_availables[..], ) .execute(&mut **transaction) .await diff --git a/src/routes/internal/flows.rs b/src/routes/internal/flows.rs index c0883e64..accbfbc6 100644 --- a/src/routes/internal/flows.rs +++ b/src/routes/internal/flows.rs @@ -27,7 +27,6 @@ use chrono::{Duration, Utc}; use rand_chacha::rand_core::SeedableRng; use rand_chacha::ChaCha20Rng; use reqwest::header::AUTHORIZATION; -use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; use sqlx::postgres::PgPool; use std::collections::HashMap; @@ -225,7 +224,6 @@ impl TempUser { created: Utc::now(), role: Role::Developer.to_string(), badges: Badges::default(), - balance: Decimal::ZERO, } .insert(transaction) .await?; @@ -1521,7 +1519,6 @@ pub async fn create_account_with_password( created: Utc::now(), role: Role::Developer.to_string(), badges: Badges::default(), - balance: Decimal::ZERO, } .insert(&mut transaction) .await?; diff --git a/src/routes/v3/payouts.rs b/src/routes/v3/payouts.rs index 52492b4c..aee5df2d 100644 --- a/src/routes/v3/payouts.rs +++ b/src/routes/v3/payouts.rs @@ -5,19 +5,20 @@ use crate::database::redis::RedisPool; use crate::models::ids::PayoutId; use crate::models::pats::Scopes; use crate::models::payouts::{PayoutMethodType, PayoutStatus}; -use crate::queue::payouts::PayoutsQueue; +use crate::queue::payouts::{make_aditude_request, PayoutsQueue}; use crate::queue::session::AuthQueue; use crate::routes::ApiError; use actix_web::{delete, get, post, web, HttpRequest, HttpResponse}; -use chrono::Utc; +use chrono::{Datelike, Duration, TimeZone, Utc, Weekday}; use hex::ToHex; use hmac::{Hmac, Mac, NewMac}; use reqwest::Method; use rust_decimal::Decimal; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use serde_json::json; use sha2::Sha256; use sqlx::PgPool; +use std::collections::HashMap; pub fn config(cfg: &mut web::ServiceConfig) { cfg.service( @@ -27,7 +28,9 @@ pub fn config(cfg: &mut web::ServiceConfig) { .service(user_payouts) .service(create_payout) .service(cancel_payout) - .service(payment_methods), + .service(payment_methods) + .service(get_balance) + .service(platform_revenue), ); } @@ -128,27 +131,6 @@ pub async fn paypal_webhook( .await?; if let Some(result) = result { - sqlx::query!( - " - SELECT balance FROM users WHERE id = $1 FOR UPDATE - ", - result.user_id - ) - .fetch_optional(&mut *transaction) - .await?; - - sqlx::query!( - " - UPDATE users - SET balance = balance + $1 - WHERE id = $2 - ", - result.amount + result.fee.unwrap_or(Decimal::ZERO), - result.user_id - ) - .execute(&mut *transaction) - .await?; - sqlx::query!( " UPDATE payouts @@ -253,27 +235,6 @@ pub async fn tremendous_webhook( .await?; if let Some(result) = result { - sqlx::query!( - " - SELECT balance FROM users WHERE id = $1 FOR UPDATE - ", - result.user_id - ) - .fetch_optional(&mut *transaction) - .await?; - - sqlx::query!( - " - UPDATE users - SET balance = balance + $1 - WHERE id = $2 - ", - result.amount + result.fee.unwrap_or(Decimal::ZERO), - result.user_id - ) - .execute(&mut *transaction) - .await?; - sqlx::query!( " UPDATE payouts @@ -380,7 +341,19 @@ pub async fn create_payout( )); } - if user.balance < body.amount || body.amount < Decimal::ZERO { + let mut transaction = pool.begin().await?; + + sqlx::query!( + " + SELECT balance FROM users WHERE id = $1 FOR UPDATE + ", + user.id.0 + ) + .fetch_optional(&mut *transaction) + .await?; + + let balance = get_user_balance(user.id.into(), &**pool).await?; + if balance.available < body.amount || body.amount < Decimal::ZERO { return Err(ApiError::InvalidInput( "You do not have enough funds to make this payout!".to_string(), )); @@ -408,17 +381,6 @@ pub async fn create_payout( )); } - let mut transaction = pool.begin().await?; - - sqlx::query!( - " - SELECT balance FROM users WHERE id = $1 FOR UPDATE - ", - user.id.0 - ) - .fetch_optional(&mut *transaction) - .await?; - let payout_id = generate_payout_id(&mut transaction).await?; let payout_item = match body.method { @@ -620,17 +582,6 @@ pub async fn create_payout( } }; - sqlx::query!( - " - UPDATE users - SET balance = balance - $1 - WHERE id = $2 - ", - body.amount, - user.id as crate::database::models::ids::UserId - ) - .execute(&mut *transaction) - .await?; payout_item.insert(&mut transaction).await?; transaction.commit().await?; @@ -759,3 +710,225 @@ pub async fn payment_methods( Ok(HttpResponse::Ok().json(methods)) } + +#[derive(Serialize)] +pub struct UserBalance { + pub available: Decimal, + pub pending: Decimal, +} + +#[get("balance")] +pub async fn get_balance( + req: HttpRequest, + pool: web::Data, + redis: web::Data, + session_queue: web::Data, +) -> Result { + let user = get_user_from_headers( + &req, + &**pool, + &redis, + &session_queue, + Some(&[Scopes::PAYOUTS_READ]), + ) + .await? + .1; + + let balance = get_user_balance(user.id.into(), &**pool).await?; + + Ok(HttpResponse::Ok().json(balance)) +} + +async fn get_user_balance( + user_id: crate::database::models::ids::UserId, + pool: &PgPool, +) -> Result { + let available = sqlx::query!( + " + SELECT SUM(amount) + FROM payouts_values + WHERE user_id = $1 AND date_available <= NOW() + ", + user_id.0 + ) + .fetch_optional(pool) + .await?; + + let pending = sqlx::query!( + " + SELECT SUM(amount) + FROM payouts_values + WHERE user_id = $1 AND date_available > NOW() + ", + user_id.0 + ) + .fetch_optional(pool) + .await?; + + let withdrawn = sqlx::query!( + " + SELECT SUM(amount) amount, SUM(fee) fee + FROM payouts + WHERE user_id = $1 AND (status = 'success' OR status = 'in-transit') + ", + user_id.0 + ) + .fetch_optional(pool) + .await?; + + let available = available + .map(|x| x.sum.unwrap_or(Decimal::ZERO)) + .unwrap_or(Decimal::ZERO); + let pending = pending + .map(|x| x.sum.unwrap_or(Decimal::ZERO)) + .unwrap_or(Decimal::ZERO); + let (withdrawn, fees) = withdrawn + .map(|x| { + ( + x.amount.unwrap_or(Decimal::ZERO), + x.fee.unwrap_or(Decimal::ZERO), + ) + }) + .unwrap_or((Decimal::ZERO, Decimal::ZERO)); + + Ok(UserBalance { + available: available.round_dp(16) - withdrawn.round_dp(16) - fees.round_dp(16), + pending, + }) +} + +#[derive(Serialize, Deserialize)] +pub struct RevenueResponse { + pub all_time: Decimal, + pub data: Vec, +} + +#[derive(Serialize, Deserialize)] +pub struct RevenueData { + pub time: u64, + pub revenue: Decimal, + pub creator_revenue: Decimal, +} + +#[get("platform_revenue")] +pub async fn platform_revenue( + pool: web::Data, + redis: web::Data, +) -> Result { + let mut redis = redis.connect().await?; + + const PLATFORM_REVENUE_NAMESPACE: &str = "platform_revenue"; + + let res: Option = redis + .get_deserialized_from_json(PLATFORM_REVENUE_NAMESPACE, "0") + .await?; + + if let Some(res) = res { + return Ok(HttpResponse::Ok().json(res)); + } + + let all_time_payouts = sqlx::query!( + " + SELECT SUM(amount) from payouts_values + ", + ) + .fetch_optional(&**pool) + .await? + .and_then(|x| x.sum) + .unwrap_or(Decimal::ZERO); + + let points = + make_aditude_request(&["METRIC_REVENUE", "METRIC_IMPRESSIONS"], "30d", "1d").await?; + + let mut points_map = HashMap::new(); + + for point in points { + for point in point.points_list { + let entry = points_map.entry(point.time.seconds).or_insert((None, None)); + + if let Some(revenue) = point.metric.revenue { + entry.0 = Some(revenue); + } + + if let Some(impressions) = point.metric.impressions { + entry.1 = Some(impressions); + } + } + } + + let mut revenue_data = Vec::new(); + let now = Utc::now(); + + for i in 1..=30 { + let time = now - Duration::days(i); + let start = time + .date_naive() + .and_hms_opt(0, 0, 0) + .unwrap() + .and_utc() + .timestamp(); + + if let Some((revenue, impressions)) = points_map.remove(&(start as u64)) { + // Before 9/5/24, when legacy payouts were in effect. + if start >= 1725494400 { + let revenue = revenue.unwrap_or(Decimal::ZERO); + let impressions = impressions.unwrap_or(0); + + // Modrinth's share of ad revenue + let modrinth_cut = Decimal::from(1) / Decimal::from(4); + // Clean.io fee (ad antimalware). Per 1000 impressions. + let clean_io_fee = Decimal::from(8) / Decimal::from(1000); + + let net_revenue = + revenue - (clean_io_fee * Decimal::from(impressions) / Decimal::from(1000)); + + let payout = net_revenue * (Decimal::from(1) - modrinth_cut); + + revenue_data.push(RevenueData { + time: start as u64, + revenue: net_revenue, + creator_revenue: payout, + }); + + continue; + } + } + + revenue_data.push(get_legacy_data_point(start as u64)); + } + + let res = RevenueResponse { + all_time: all_time_payouts, + data: revenue_data, + }; + + redis + .set_serialized_to_json(PLATFORM_REVENUE_NAMESPACE, 0, &res, None) + .await?; + + Ok(HttpResponse::Ok().json(res)) +} + +fn get_legacy_data_point(timestamp: u64) -> RevenueData { + let start = Utc.timestamp_opt(timestamp as i64, 0).unwrap(); + + let old_payouts_budget = Decimal::from(10_000); + + let days = Decimal::from(28); + let weekdays = Decimal::from(20); + let weekend_bonus = Decimal::from(5) / Decimal::from(4); + + let weekday_amount = old_payouts_budget / (weekdays + (weekend_bonus) * (days - weekdays)); + let weekend_amount = weekday_amount * weekend_bonus; + + let payout = match start.weekday() { + Weekday::Sat | Weekday::Sun => weekend_amount, + _ => weekday_amount, + }; + + RevenueData { + time: timestamp, + revenue: payout, + creator_revenue: payout * (Decimal::from(9) / Decimal::from(10)), + } +} From 99d7efbba889495f562846af470181a0e62ebc09 Mon Sep 17 00:00:00 2001 From: Jai A Date: Thu, 12 Sep 2024 14:30:59 -0700 Subject: [PATCH 2/3] Update expiry --- src/routes/v3/payouts.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/routes/v3/payouts.rs b/src/routes/v3/payouts.rs index aee5df2d..f2d1d6fb 100644 --- a/src/routes/v3/payouts.rs +++ b/src/routes/v3/payouts.rs @@ -903,7 +903,7 @@ pub async fn platform_revenue( }; redis - .set_serialized_to_json(PLATFORM_REVENUE_NAMESPACE, 0, &res, None) + .set_serialized_to_json(PLATFORM_REVENUE_NAMESPACE, 0, &res, Some(60 * 60)) .await?; Ok(HttpResponse::Ok().json(res)) From 3ed206ac0f094c044223857ed32aa31b2b48e56b Mon Sep 17 00:00:00 2001 From: Jai A Date: Thu, 12 Sep 2024 14:45:35 -0700 Subject: [PATCH 3/3] Fix tests --- tests/analytics.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/analytics.rs b/tests/analytics.rs index 13c44806..b3c1fcef 100644 --- a/tests/analytics.rs +++ b/tests/analytics.rs @@ -24,8 +24,13 @@ pub async fn analytics_revenue() { let pool = test_env.db.pool.clone(); // Generate sample revenue data- directly insert into sql - let (mut insert_user_ids, mut insert_project_ids, mut insert_payouts, mut insert_starts) = - (Vec::new(), Vec::new(), Vec::new(), Vec::new()); + let ( + mut insert_user_ids, + mut insert_project_ids, + mut insert_payouts, + mut insert_starts, + mut insert_availables, + ) = (Vec::new(), Vec::new(), Vec::new(), Vec::new(), Vec::new()); // Note: these go from most recent to least recent let money_time_pairs: [(f64, DateTime); 10] = [ @@ -47,6 +52,7 @@ pub async fn analytics_revenue() { insert_project_ids.push(project_id); insert_payouts.push(Decimal::from_f64_retain(*money).unwrap()); insert_starts.push(*time); + insert_availables.push(*time); } let mut transaction = pool.begin().await.unwrap(); @@ -55,6 +61,7 @@ pub async fn analytics_revenue() { insert_project_ids, insert_payouts, insert_starts, + insert_availables, &mut transaction, ) .await