Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
finish
Browse files Browse the repository at this point in the history
  • Loading branch information
Geometrically committed Aug 3, 2024
1 parent f0dc542 commit 8bbb7b0
Show file tree
Hide file tree
Showing 4 changed files with 330 additions and 77 deletions.
67 changes: 66 additions & 1 deletion src/database/models/product_item.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use crate::database::models::{DatabaseError, ProductId, ProductPriceId};
use crate::database::models::{product_item, DatabaseError, ProductId, ProductPriceId};
use crate::database::redis::RedisPool;
use crate::models::billing::{PriceInterval, ProductMetadata};
use dashmap::DashMap;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
use std::convert::TryInto;

const PRODUCTS_NAMESPACE: &str = "products";

pub struct ProductItem {
pub id: ProductId,
pub metadata: ProductMetadata,
Expand Down Expand Up @@ -82,6 +86,67 @@ impl ProductItem {
}
}

#[derive(Deserialize, Serialize)]
pub struct QueryProduct {
pub id: ProductId,
pub metadata: ProductMetadata,
pub unitary: bool,
pub prices: Vec<ProductPriceItem>,
}

impl QueryProduct {
pub async fn list<'a, E>(exec: E, redis: &RedisPool) -> Result<Vec<QueryProduct>, DatabaseError>
where
E: sqlx::Executor<'a, Database = sqlx::Postgres> + Copy,
{
let mut redis = redis.connect().await?;

let res: Option<Vec<QueryProduct>> = redis
.get_deserialized_from_json(PRODUCTS_NAMESPACE, "all")
.await?;

if let Some(res) = res {
return Ok(res);
}

let all_products = product_item::ProductItem::get_all(exec).await?;
let prices = product_item::ProductPriceItem::get_all_products_prices(
&all_products.iter().map(|x| x.id).collect::<Vec<_>>(),
exec,
)
.await?;

let products = all_products
.into_iter()
.map(|x| QueryProduct {
id: x.id.into(),
metadata: x.metadata,
prices: prices
.remove(&x.id)
.map(|x| x.1)
.unwrap_or_default()
.into_iter()
.map(|x| ProductPriceItem {
id: x.id,
product_id: x.product_id,
interval: x.interval,
price: x.price,
currency_code: x.currency_code,
})
.collect(),
unitary: x.unitary,
})
.collect::<Vec<_>>();

redis
.set_serialized_to_json(PRODUCTS_NAMESPACE, "all", &products, None)
.await?;

Ok(products)
}
}

#[derive(Deserialize, Serialize)]
pub struct ProductPriceItem {
pub id: ProductPriceId,
pub product_id: ProductId,
Expand Down
11 changes: 11 additions & 0 deletions src/database/models/user_subscription_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ impl UserSubscriptionItem {
Ok(results.into_iter().map(|r| r.into()).collect())
}

pub async fn get_all_expired(
exec: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Vec<UserSubscriptionItem>, DatabaseError> {
let now = Utc::now();
let results = select_user_subscriptions_with_predicate!("WHERE expires < $1", now)
.fetch_all(exec)
.await?;

Ok(results.into_iter().map(|r| r.into()).collect())
}

pub async fn upsert(
&self,
transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>,
Expand Down
35 changes: 14 additions & 21 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,16 @@ pub fn app_setup(

let automated_moderation_queue = web::Data::new(AutomatedModerationQueue::default());

let automated_moderation_queue_ref = automated_moderation_queue.clone();
let pool_ref = pool.clone();
let redis_pool_ref = redis_pool.clone();
actix_rt::spawn(async move {
automated_moderation_queue_ref
.task(pool_ref, redis_pool_ref)
.await;
});
{
let automated_moderation_queue_ref = automated_moderation_queue.clone();
let pool_ref = pool.clone();
let redis_pool_ref = redis_pool.clone();
actix_rt::spawn(async move {
automated_moderation_queue_ref
.task(pool_ref, redis_pool_ref)
.await;
});
}

let mut scheduler = scheduler::Scheduler::new();

Expand Down Expand Up @@ -258,21 +260,14 @@ pub fn app_setup(
});
}

let stripe_client = stripe::Client::new(dotenvy::var("STRIPE_API_KEY").unwrap());
{
let pool_ref = pool.clone();
let redis_ref = redis_pool.clone();
scheduler.run(std::time::Duration::from_secs(60 * 30), move || {
let pool_ref = pool_ref.clone();
let redis_ref = redis_ref.clone();
let stripe_client_ref = stripe_client.clone();

async move {
info!("Indexing billing queue");
let result = crate::routes::internal::billing::task(&pool_ref, &redis_ref).await;
if let Err(e) = result {
warn!("Indexing billing queue failed: {:?}", e);
}
info!("Done indexing billing queue");
}
actix_rt::spawn(async move {
routes::internal::billing::task(stripe_client_ref, pool_ref, redis_ref).await;
});
}

Expand All @@ -283,8 +278,6 @@ pub fn app_setup(
let payouts_queue = web::Data::new(PayoutsQueue::new());
let active_sockets = web::Data::new(RwLock::new(ActiveSockets::default()));

let stripe_client = stripe::Client::new(dotenvy::var("STRIPE_API_KEY").unwrap());

LabrinthConfig {
pool,
redis_pool,
Expand Down
Loading

0 comments on commit 8bbb7b0

Please sign in to comment.