Skip to content

Commit

Permalink
feat: Send webhooks for invoice events (#425)
Browse files Browse the repository at this point in the history
  • Loading branch information
azhur authored Dec 11, 2024
1 parent da0ca84 commit bd8990c
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 67 deletions.
71 changes: 55 additions & 16 deletions modules/meteroid/crates/meteroid-store/src/domain/outbox_event.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::domain::enums::BillingPeriodEnum;
use crate::domain::{Address, Customer, ShippingAddress, Subscription};
use crate::domain::enums::{BillingPeriodEnum, InvoiceStatusEnum};
use crate::domain::{Address, Customer, DetailedInvoice, ShippingAddress, Subscription};
use crate::errors::{StoreError, StoreErrorReport};
use crate::utils::local_id::{IdType, LocalId};
use crate::StoreResult;
Expand All @@ -18,7 +18,7 @@ pub struct OutboxEvent {
}

impl OutboxEvent {
pub fn customer_created(event: CustomerCreatedEvent) -> OutboxEvent {
pub fn customer_created(event: CustomerEvent) -> OutboxEvent {
OutboxEvent {
tenant_id: event.tenant_id,
aggregate_id: event.id,
Expand All @@ -34,15 +34,23 @@ impl OutboxEvent {
}
}

pub fn invoice_finalized(tenant_id: Uuid, invoice_id: Uuid) -> OutboxEvent {
pub fn invoice_created(event: InvoiceEvent) -> OutboxEvent {
OutboxEvent {
tenant_id,
aggregate_id: invoice_id,
event_type: EventType::InvoiceFinalized,
tenant_id: event.tenant_id,
aggregate_id: event.id,
event_type: EventType::InvoiceCreated(Box::new(event)),
}
}

pub fn subscription_created(event: SubscriptionCreatedEvent) -> OutboxEvent {
pub fn invoice_finalized(event: InvoiceEvent) -> OutboxEvent {
OutboxEvent {
tenant_id: event.tenant_id,
aggregate_id: event.id,
event_type: EventType::InvoiceFinalized(Box::new(event)),
}
}

pub fn subscription_created(event: SubscriptionEvent) -> OutboxEvent {
OutboxEvent {
tenant_id: event.tenant_id,
aggregate_id: event.id,
Expand All @@ -53,7 +61,8 @@ impl OutboxEvent {
fn payload_json(&self) -> StoreResult<Option<serde_json::Value>> {
match &self.event_type {
EventType::CustomerCreated(event) => Ok(Some(Self::event_json(event)?)),
EventType::InvoiceFinalized => Ok(None),
EventType::InvoiceCreated(event) => Ok(Some(Self::event_json(event)?)),
EventType::InvoiceFinalized(event) => Ok(Some(Self::event_json(event)?)),
EventType::InvoicePdfRequested => Ok(None),
EventType::SubscriptionCreated(event) => Ok(Some(Self::event_json(event)?)),
}
Expand All @@ -75,21 +84,23 @@ impl OutboxEvent {
#[derive(Display)]
pub enum EventType {
#[strum(serialize = "customer.created")]
CustomerCreated(Box<CustomerCreatedEvent>),
CustomerCreated(Box<CustomerEvent>),
#[strum(serialize = "invoice.created")]
InvoiceCreated(Box<InvoiceEvent>),
#[strum(serialize = "invoice.finalized")]
/// todo this needs payload as well
InvoiceFinalized,
InvoiceFinalized(Box<InvoiceEvent>),
#[strum(serialize = "invoice.pdf.requested")]
InvoicePdfRequested,
#[strum(serialize = "subscription.created")]
SubscriptionCreated(Box<SubscriptionCreatedEvent>),
SubscriptionCreated(Box<SubscriptionEvent>),
}

impl EventType {
pub fn aggregate_type(&self) -> String {
match self {
EventType::CustomerCreated(_) => "customer".to_string(),
EventType::InvoiceFinalized => "invoice".to_string(),
EventType::InvoiceCreated(_) => "invoice".to_string(),
EventType::InvoiceFinalized(_) => "invoice".to_string(),
EventType::InvoicePdfRequested => "invoice".to_string(),
EventType::SubscriptionCreated(_) => "subscription".to_string(),
}
Expand All @@ -113,7 +124,7 @@ impl TryInto<OutboxEventRowNew> for OutboxEvent {

#[derive(Debug, Serialize, Deserialize, o2o)]
#[from_owned(Customer)]
pub struct CustomerCreatedEvent {
pub struct CustomerEvent {
pub id: Uuid,
pub local_id: String,
pub tenant_id: Uuid,
Expand All @@ -135,7 +146,7 @@ pub struct CustomerCreatedEvent {

#[derive(Debug, Serialize, Deserialize, o2o)]
#[from_owned(Subscription)]
pub struct SubscriptionCreatedEvent {
pub struct SubscriptionEvent {
pub id: Uuid,
pub local_id: String,
pub tenant_id: Uuid,
Expand Down Expand Up @@ -171,3 +182,31 @@ pub struct SubscriptionCreatedEvent {
pub mrr_cents: u64,
pub period: BillingPeriodEnum,
}

#[derive(Debug, Serialize, Deserialize, o2o)]
#[from_owned(DetailedInvoice)]
pub struct InvoiceEvent {
#[map(@.invoice.id)]
pub id: Uuid,
#[map(@.invoice.local_id)]
pub local_id: String,
#[map(@.invoice.status)]
pub status: InvoiceStatusEnum,
#[map(@.invoice.tenant_id)]
pub tenant_id: Uuid,
#[map(@.invoice.customer_id)]
pub customer_id: Uuid,
#[map(@.customer.local_id)]
pub customer_local_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
#[map(@.invoice.subscription_id)]
pub subscription_id: Option<Uuid>,
#[map(@.invoice.currency)]
pub currency: String,
#[map(@.invoice.tax_amount)]
pub tax_amount: i64,
#[map(@.invoice.total)]
pub total: i64,
#[map(@.invoice.created_at)]
pub created_at: NaiveDateTime,
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ pub struct WebhookOutMessageNew {
pub enum WebhookOutMessagePayload {
Customer(serde_json::Value),
Subscription(serde_json::Value),
Invoice(serde_json::Value),
}

impl TryFrom<WebhookOutMessageNew> for svix::api::MessageIn {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::domain::{
};
use crate::errors::StoreError;
use crate::repositories::customer_balance::CustomerBalance;
use crate::repositories::invoices::insert_invoice;
use crate::repositories::invoices::insert_invoice_tx;
use crate::repositories::invoicing_entities::InvoicingEntityInterface;
use crate::repositories::InvoiceInterface;
use crate::store::Store;
Expand Down Expand Up @@ -442,7 +442,7 @@ impl CustomersInterface for Store {
},
};

let inserted_invoice = insert_invoice(conn, invoice_new).await?;
let inserted_invoice = insert_invoice_tx(self, conn, invoice_new).await?;

InvoicingEntityRow::update_invoicing_entity_number(
conn,
Expand Down
85 changes: 53 additions & 32 deletions modules/meteroid/crates/meteroid-store/src/repositories/invoices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,34 +151,17 @@ impl InvoiceInterface for Store {
}

async fn insert_invoice(&self, invoice: InvoiceNew) -> StoreResult<Invoice> {
let mut conn = self.get_conn().await?;

insert_invoice(&mut conn, invoice).await
self.transaction(|conn| {
async move { insert_invoice_tx(self, conn, invoice).await }.scope_boxed()
})
.await
}

async fn insert_invoice_batch(&self, invoice: Vec<InvoiceNew>) -> StoreResult<Vec<Invoice>> {
let mut conn = self.get_conn().await?;

let insertable_invoice: Vec<InvoiceRowNew> = invoice
.into_iter()
.map(|c| c.try_into())
.collect::<Result<_, _>>()?;

let inserted: Vec<Invoice> =
InvoiceRow::insert_invoice_batch(&mut conn, insertable_invoice)
.await
.map_err(Into::<Report<StoreError>>::into)
.and_then(|v| {
v.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, Report<StoreError>>>()
})?;

for inv in &inserted {
process_mrr(inv, &mut conn).await?; // TODO batch
}

Ok(inserted)
self.transaction(|conn| {
async move { insert_invoice_batch_tx(self, conn, invoice).await }.scope_boxed()
})
.await
}

async fn update_invoice_external_status(
Expand Down Expand Up @@ -300,10 +283,15 @@ impl InvoiceInterface for Store {
.await
.map_err(Into::<Report<StoreError>>::into)?;

let final_invoice: DetailedInvoice = InvoiceRow::find_by_id(conn, tenant_id, id)
.await
.map_err(Into::into)
.and_then(|row| row.try_into())?;

self.internal
.insert_outbox_events_tx(
conn,
vec![OutboxEvent::invoice_finalized(tenant_id, id)],
vec![OutboxEvent::invoice_finalized(final_invoice.into())],
)
.await?;

Expand Down Expand Up @@ -581,16 +569,49 @@ async fn compute_invoice_patch(
}
}

pub async fn insert_invoice(conn: &mut PgConn, invoice: InvoiceNew) -> StoreResult<Invoice> {
let insertable_invoice: InvoiceRowNew = invoice.try_into()?;
pub async fn insert_invoice_tx(
store: &Store,
tx: &mut PgConn,
invoice: InvoiceNew,
) -> StoreResult<Invoice> {
insert_invoice_batch_tx(store, tx, vec![invoice])
.await?
.pop()
.ok_or(StoreError::InsertError.into())
}

let inserted: Invoice = insertable_invoice
.insert(conn)
async fn insert_invoice_batch_tx(
store: &Store,
tx: &mut PgConn,
invoice: Vec<InvoiceNew>,
) -> StoreResult<Vec<Invoice>> {
let insertable_invoice: Vec<InvoiceRowNew> = invoice
.into_iter()
.map(|c| c.try_into())
.collect::<Result<_, _>>()?;

let inserted: Vec<Invoice> = InvoiceRow::insert_invoice_batch(tx, insertable_invoice)
.await
.map_err(Into::<Report<StoreError>>::into)
.and_then(TryInto::try_into)?;
.and_then(|v| {
v.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, Report<StoreError>>>()
})?;

// TODO batch
for inv in &inserted {
process_mrr(inv, tx).await?;
let final_invoice: DetailedInvoice = InvoiceRow::find_by_id(tx, inv.tenant_id, inv.id)
.await
.map_err(Into::into)
.and_then(|row| row.try_into())?;

process_mrr(&inserted, conn).await?;
store
.internal
.insert_outbox_events_tx(tx, vec![OutboxEvent::invoice_created(final_invoice.into())])
.await?;
}

Ok(inserted)
}
Expand Down
22 changes: 15 additions & 7 deletions modules/meteroid/src/workers/kafka/outbox.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use chrono::{DateTime, Utc};
use meteroid_store::domain::outbox_event::{CustomerCreatedEvent, SubscriptionCreatedEvent};
use meteroid_store::domain::outbox_event::{CustomerEvent, InvoiceEvent, SubscriptionEvent};
use rdkafka::message::{BorrowedHeaders, BorrowedMessage, Headers};
use rdkafka::Message;
use serde::Deserialize;
Expand All @@ -16,10 +16,11 @@ pub struct OutboxEvent {

#[derive(Debug)]
pub enum EventType {
CustomerCreated(Box<CustomerCreatedEvent>),
InvoiceFinalized,
CustomerCreated(Box<CustomerEvent>),
InvoiceCreated(Box<InvoiceEvent>),
InvoiceFinalized(Box<InvoiceEvent>),
InvoicePdfRequested,
SubscriptionCreated(Box<SubscriptionCreatedEvent>),
SubscriptionCreated(Box<SubscriptionEvent>),
}

impl EventType {
Expand All @@ -31,13 +32,20 @@ impl EventType {

match event_type.as_str() {
"customer.created" => {
let payload = extract_payload::<CustomerCreatedEvent>(m).ok()??;
let payload = extract_payload::<CustomerEvent>(m).ok()??;
Some(Self::CustomerCreated(Box::new(payload)))
}
"invoice.finalized" => Some(Self::InvoiceFinalized),
"invoice.created" => {
let payload = extract_payload::<InvoiceEvent>(m).ok()??;
Some(Self::InvoiceCreated(Box::new(payload)))
}
"invoice.finalized" => {
let payload = extract_payload::<InvoiceEvent>(m).ok()??;
Some(Self::InvoiceFinalized(Box::new(payload)))
}
"invoice.pdf.requested" => Some(Self::InvoicePdfRequested),
"subscription.created" => {
let payload = extract_payload::<SubscriptionCreatedEvent>(m).ok()??;
let payload = extract_payload::<SubscriptionEvent>(m).ok()??;
Some(Self::SubscriptionCreated(Box::new(payload)))
}
_ => None,
Expand Down
2 changes: 1 addition & 1 deletion modules/meteroid/src/workers/kafka/pdf_renderer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl MessageHandler for PdfRendererHandler {
log::info!("Processing message: {:?}", event);

match event.event_type {
EventType::InvoiceFinalized | EventType::InvoicePdfRequested => {
EventType::InvoiceFinalized(_) | EventType::InvoicePdfRequested => {
let invoice_id: Uuid = parse_uuid(event.aggregate_id.as_str(), "aggregate_id")?;

let result = self.pdf_service.generate_pdfs(vec![invoice_id]).await;
Expand Down
6 changes: 5 additions & 1 deletion modules/meteroid/src/workers/kafka/processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ use meteroid_store::Store;
use std::sync::Arc;

pub async fn run_webhook_outbox_processor(kafka_config: &KafkaConnectionConfig, store: Arc<Store>) {
let topics = vec!["outbox.event.customer", "outbox.event.subscription"];
let topics = vec![
"outbox.event.customer",
"outbox.event.subscription",
"outbox.event.invoice",
];
let group_id = "webhook_outbox_processor";

let handler = Arc::new(WebhookHandler::new(store));
Expand Down
Loading

0 comments on commit bd8990c

Please sign in to comment.