Skip to content

Commit

Permalink
feat: Send webhook on subscription created (#423)
Browse files Browse the repository at this point in the history
  • Loading branch information
azhur authored Dec 9, 2024
1 parent 4bd863c commit cd761be
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 44 deletions.
3 changes: 3 additions & 0 deletions docker/develop/init-debezium-connectors.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ localhost:8083/connectors \
"schema.include.list": "public",
"table.include.list": "public.outbox_event",
"topic.prefix": "outbox.event",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"plugin.name": "pgoutput",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl SubscriptionRow {
.into_db_result()
}

pub async fn get_subscriptions_by_ids(
pub async fn list_subscriptions_by_ids(
conn: &mut PgConn,
tenant_id_param: &uuid::Uuid,
subscription_ids: &[uuid::Uuid],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,17 @@ pub struct CustomerPatch {

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Address {
#[serde(skip_serializing_if = "Option::is_none")]
pub line1: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub line2: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub city: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub country: Option<String>, // TODO mandatory ?
#[serde(skip_serializing_if = "Option::is_none")]
pub state: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub zip_code: Option<String>,
}

Expand Down Expand Up @@ -210,6 +216,7 @@ impl TryInto<serde_json::Value> for Address {

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct ShippingAddress {
#[serde(skip_serializing_if = "Option::is_none")]
pub address: Option<Address>,
pub same_as_billing: bool,
}
Expand Down
63 changes: 61 additions & 2 deletions modules/meteroid/crates/meteroid-store/src/domain/outbox_event.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::domain::{Address, Customer, ShippingAddress};
use crate::domain::enums::BillingPeriodEnum;
use crate::domain::{Address, Customer, ShippingAddress, Subscription};
use crate::errors::{StoreError, StoreErrorReport};
use crate::utils::local_id::{IdType, LocalId};
use crate::StoreResult;
use chrono::{NaiveDate, NaiveDateTime};
use diesel_models::outbox_event::OutboxEventRowNew;
use error_stack::Report;
use o2o::o2o;
Expand Down Expand Up @@ -40,11 +42,20 @@ impl OutboxEvent {
}
}

pub fn payload_json(&self) -> StoreResult<Option<serde_json::Value>> {
pub fn subscription_created(event: SubscriptionCreatedEvent) -> OutboxEvent {
OutboxEvent {
tenant_id: event.tenant_id,
aggregate_id: event.id,
event_type: EventType::SubscriptionCreated(Box::new(event)),
}
}

fn payload_json(&self) -> StoreResult<Option<serde_json::Value>> {
match &self.event_type {
EventType::CustomerCreated(event) => Ok(Some(Self::event_json(event)?)),
EventType::InvoiceFinalized => Ok(None),
EventType::InvoicePdfRequested => Ok(None),
EventType::SubscriptionCreated(event) => Ok(Some(Self::event_json(event)?)),
}
}

Expand All @@ -70,6 +81,8 @@ pub enum EventType {
InvoiceFinalized,
#[strum(serialize = "invoice.pdf.requested")]
InvoicePdfRequested,
#[strum(serialize = "subscription.created")]
SubscriptionCreated(Box<SubscriptionCreatedEvent>),
}

impl EventType {
Expand All @@ -78,6 +91,7 @@ impl EventType {
EventType::CustomerCreated(_) => "customer".to_string(),
EventType::InvoiceFinalized => "invoice".to_string(),
EventType::InvoicePdfRequested => "invoice".to_string(),
EventType::SubscriptionCreated(_) => "subscription".to_string(),
}
}
}
Expand All @@ -104,11 +118,56 @@ pub struct CustomerCreatedEvent {
pub local_id: String,
pub tenant_id: Uuid,
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub alias: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub email: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub invoicing_email: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub phone: Option<String>,
pub currency: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub billing_address: Option<Address>,
#[serde(skip_serializing_if = "Option::is_none")]
pub shipping_address: Option<ShippingAddress>,
}

#[derive(Debug, Serialize, Deserialize, o2o)]
#[from_owned(Subscription)]
pub struct SubscriptionCreatedEvent {
pub id: Uuid,
pub local_id: String,
pub tenant_id: Uuid,
pub customer_id: Uuid,
pub customer_local_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub customer_alias: Option<String>,
pub customer_name: String,
pub billing_day: i16,
pub currency: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub trial_start_date: Option<NaiveDate>,
pub billing_start_date: NaiveDate,
#[serde(skip_serializing_if = "Option::is_none")]
pub billing_end_date: Option<NaiveDate>,
pub plan_id: Uuid,
pub plan_name: String,
pub plan_version_id: Uuid,
pub version: u32,
pub created_at: NaiveDateTime,
pub created_by: Uuid,
pub net_terms: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub invoice_memo: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub invoice_threshold: Option<rust_decimal::Decimal>,
#[serde(skip_serializing_if = "Option::is_none")]
pub activated_at: Option<NaiveDateTime>,
#[serde(skip_serializing_if = "Option::is_none")]
pub canceled_at: Option<NaiveDateTime>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cancellation_reason: Option<String>,
pub mrr_cents: u64,
pub period: BillingPeriodEnum,
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ pub struct WebhookOutMessageNew {
#[serde(tag = "object", rename_all = "snake_case")]
pub enum WebhookOutMessagePayload {
Customer(serde_json::Value),
Subscription(serde_json::Value),
}

impl TryFrom<WebhookOutMessageNew> for svix::api::MessageIn {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use crate::utils::decimals::ToSubunit;
use crate::{domain, StoreResult};
use chrono::{NaiveDate, NaiveTime};
use diesel_async::scoped_futures::ScopedFutureExt;
use diesel_async::AsyncConnection;
use diesel_models::errors::{DatabaseError, DatabaseErrorContainer};
use diesel_models::errors::DatabaseError;
use error_stack::{report, Report};
use itertools::Itertools;
use std::collections::HashMap;
Expand All @@ -28,6 +27,7 @@ use uuid::Uuid;
use crate::constants::Currencies;
use crate::domain::add_ons::AddOn;
use crate::domain::coupons::{Coupon, CouponDiscount};
use crate::domain::outbox_event::OutboxEvent;
use crate::domain::subscription_add_ons::SubscriptionAddOn;
use crate::repositories::historical_rates::HistoricalRatesInterface;
use crate::repositories::invoicing_entities::InvoicingEntityInterface;
Expand Down Expand Up @@ -465,39 +465,43 @@ impl SubscriptionInterface for Store {
let insertable_subscription_events: Vec<&SubscriptionEventRow> =
insertable.iter().map(|c| &c.event).collect();

let inserted_subscriptions = conn
.transaction(|conn| {
let inserted_subscriptions = self
.transaction_with(&mut conn, |conn| {
async move {
let inserted_subscriptions: Vec<CreatedSubscription> =
SubscriptionRow::insert_subscription_batch(conn, insertable_subscriptions)
.await
.map_err(Into::<DatabaseErrorContainer>::into)
.map_err(Into::<Report<StoreError>>::into)
.map(|v| v.into_iter().map(Into::into).collect())?;

SubscriptionComponentRow::insert_subscription_component_batch(
conn,
insertable_subscription_components,
)
.await
.map_err(Into::<DatabaseErrorContainer>::into)?;
.map_err(Into::<Report<StoreError>>::into)?;

SubscriptionAddOnRow::insert_batch(conn, insertable_subscription_add_ons)
.await
.map_err(Into::<DatabaseErrorContainer>::into)?;
.map_err(Into::<Report<StoreError>>::into)?;

apply_coupons(
conn,
&insertable_subscription_coupons,
&inserted_subscriptions,
tenant_id,
)
.await?;
.await
.map_err(Into::<Report<StoreError>>::into)?;

SubscriptionEventRow::insert_batch(conn, insertable_subscription_events)
.await
.map_err(Into::<DatabaseErrorContainer>::into)?;
.map_err(Into::<Report<StoreError>>::into)?;

Ok::<_, DatabaseErrorContainer>(inserted_subscriptions)
insert_created_outbox_events_tx(conn, self, &inserted_subscriptions, tenant_id)
.await?;

Ok::<_, Report<StoreError>>(inserted_subscriptions)
}
.scope_boxed()
})
Expand Down Expand Up @@ -938,9 +942,7 @@ async fn apply_coupons(
) -> DbResult<()> {
validate_coupons(tx_conn, subscription_coupons, subscriptions, tenant_id).await?;

AppliedCouponRow::insert_batch(tx_conn, subscription_coupons.to_vec())
.await
.map_err(Into::<DatabaseErrorContainer>::into)?;
AppliedCouponRow::insert_batch(tx_conn, subscription_coupons.to_vec()).await?;

CouponRow::update_last_redemption_at(
tx_conn,
Expand All @@ -951,16 +953,13 @@ async fn apply_coupons(
.collect::<Vec<_>>(),
chrono::Utc::now().naive_utc(),
)
.await
.map_err(Into::<DatabaseErrorContainer>::into)?;
.await?;

let subscriptions_by_coupon: HashMap<Uuid, usize> =
subscription_coupons.iter().counts_by(|x| x.coupon_id);

for (coupon_id, subscriptions_count) in subscriptions_by_coupon {
CouponRow::inc_redemption_count(tx_conn, coupon_id, subscriptions_count as i32)
.await
.map_err(Into::<DatabaseErrorContainer>::into)?;
CouponRow::inc_redemption_count(tx_conn, coupon_id, subscriptions_count as i32).await?;
}

Ok(())
Expand Down Expand Up @@ -1351,3 +1350,34 @@ pub fn subscription_to_draft(

Ok(invoice)
}

async fn insert_created_outbox_events_tx(
conn: &mut PgConn,
store: &Store,
created: &[CreatedSubscription],
tenant_id: Uuid,
) -> StoreResult<()> {
let ids = created.iter().map(|c| c.id).collect::<Vec<_>>();

if ids.is_empty() {
return Ok(());
}

let subscriptions: Vec<Subscription> =
SubscriptionRow::list_subscriptions_by_ids(conn, &tenant_id, &ids)
.await
.map_err(Into::<Report<StoreError>>::into)?
.into_iter()
.map(|s| s.into())
.collect();

let outbox_events: Vec<OutboxEvent> = subscriptions
.into_iter()
.map(|s| OutboxEvent::subscription_created(s.into()))
.collect();

store
.internal
.insert_outbox_events_tx(conn, outbox_events)
.await
}
23 changes: 11 additions & 12 deletions modules/meteroid/src/workers/kafka/outbox.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use chrono::{DateTime, Utc};
use meteroid_store::domain::outbox_event::CustomerCreatedEvent;
use meteroid_store::domain::outbox_event::{CustomerCreatedEvent, SubscriptionCreatedEvent};
use rdkafka::message::{BorrowedHeaders, BorrowedMessage, Headers};
use rdkafka::Message;
use serde::Deserialize;
use serde_json::Value;
use uuid::Uuid;

#[derive(Debug)]
Expand All @@ -20,6 +19,7 @@ pub enum EventType {
CustomerCreated(Box<CustomerCreatedEvent>),
InvoiceFinalized,
InvoicePdfRequested,
SubscriptionCreated(Box<SubscriptionCreatedEvent>),
}

impl EventType {
Expand All @@ -36,6 +36,10 @@ impl EventType {
}
"invoice.finalized" => Some(Self::InvoiceFinalized),
"invoice.pdf.requested" => Some(Self::InvoicePdfRequested),
"subscription.created" => {
let payload = extract_payload::<SubscriptionCreatedEvent>(m).ok()??;
Some(Self::SubscriptionCreated(Box::new(payload)))
}
_ => None,
}
}
Expand All @@ -48,10 +52,7 @@ pub(crate) fn parse_outbox_event(m: &BorrowedMessage<'_>) -> Option<OutboxEvent>
let id = headers.get_as_string("local_id")?;
let tenant_id = headers.get_as_uuid("tenant_id")?;

let aggregate_id: String = String::from_utf8(m.key()?.to_vec())
.ok()?
.trim_matches('"')
.into();
let aggregate_id: String = String::from_utf8(m.key()?.to_vec()).ok()?;

let event_type = EventType::from_kafka_message(m)?;

Expand All @@ -70,9 +71,7 @@ fn extract_payload<P: for<'a> Deserialize<'a>>(
m: &BorrowedMessage<'_>,
) -> Result<Option<P>, serde_json::Error> {
if let Some(payload) = m.payload() {
let parsed: Value = serde_json::from_slice(payload)?;
let payload = &parsed["payload"];
let parsed = serde_json::from_value(payload.clone())?;
let parsed = serde_json::from_slice(payload)?;
Ok(Some(parsed))
} else {
Ok(None)
Expand All @@ -81,7 +80,7 @@ fn extract_payload<P: for<'a> Deserialize<'a>>(

trait ParseableHeaders {
fn get_as_string(&self, key: &str) -> Option<String>;
fn get_as_uuid(&self, key: &str) -> Option<uuid::Uuid>;
fn get_as_uuid(&self, key: &str) -> Option<Uuid>;
}

impl ParseableHeaders for &BorrowedHeaders {
Expand All @@ -93,8 +92,8 @@ impl ParseableHeaders for &BorrowedHeaders {
String::from_utf8(header_value.to_vec()).ok()
}

fn get_as_uuid(&self, key: &str) -> Option<uuid::Uuid> {
fn get_as_uuid(&self, key: &str) -> Option<Uuid> {
self.get_as_string(key)
.and_then(|header_value| uuid::Uuid::parse_str(&header_value).ok())
.and_then(|header_value| Uuid::parse_str(&header_value).ok())
}
}
2 changes: 1 addition & 1 deletion modules/meteroid/src/workers/kafka/processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use meteroid_store::Store;
use std::sync::Arc;

pub async fn run_webhook_outbox_processor(kafka_config: &KafkaConnectionConfig, store: Arc<Store>) {
let topics = vec!["outbox.event.customer"];
let topics = vec!["outbox.event.customer", "outbox.event.subscription"];
let group_id = "webhook_outbox_processor";

let handler = Arc::new(WebhookHandler::new(store));
Expand Down
Loading

0 comments on commit cd761be

Please sign in to comment.