Skip to content

Commit

Permalink
Move helper functions for stripe customer to separate file and implem…
Browse files Browse the repository at this point in the history
…ent test
  • Loading branch information
BerglundDaniel committed Nov 30, 2023
1 parent e1391c9 commit 498a6b1
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 7 deletions.
116 changes: 116 additions & 0 deletions api/src/shop/stripe_customer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from logging import getLogger
from typing import Any, Dict, Optional

import stripe

from stripe.error import InvalidRequestError
from shop.stripe_util import retry, are_metadata_dicts_equivalent
from service.db import db_session
from service.error import NotFound, InternalServerError
from membership.models import Member
from shop.stripe_constants import (
MakerspaceMetadataKeys as MSMetaKeys,
)

logger = getLogger("makeradmin")


def _get_metadata_for_stripe_customer(makeradmin_member: Member) -> Dict[str, Any]:
return {
MSMetaKeys.PENDING_MEMBER.value: "pending" if makeradmin_member.pending_activation else "",
MSMetaKeys.USER_ID.value: makeradmin_member.member_id,
MSMetaKeys.MEMBER_NUMBER.value: makeradmin_member.member_number,
}


def get_stripe_customer(makeradmin_member: Member) -> stripe.Customer | None:
try:
customer = retry(lambda: stripe.Customer.retrieve(makeradmin_member.stripe_customer_id))
except InvalidRequestError as e:
logger.warning(
f"failed to retrive customer from stripe for makeradmin member with id {makeradmin_member.member_id}, {e}"
)
return None
if hasattr(customer, "deleted") and customer.deleted:
raise InvalidRequestError(f"Customer for member with id {makeradmin_member.member_id} has been deleted")
return customer


def eq_makeradmin_stripe_customer(makeradmin_member: Member, stripe_customer: stripe.Customer) -> bool:
"""Check that the essential parts of the product are the same in both makeradmin and stripe"""
member_email = makeradmin_member.email.strip()
expected_metadata = _get_metadata_for_stripe_customer(makeradmin_member)
return (
are_metadata_dicts_equivalent(stripe_customer.metadata, expected_metadata)
and stripe_customer.email == member_email
)


def _create_stripe_customer(
makeradmin_member: Member, test_clock: Optional[stripe.test_helpers.TestClock] = None
) -> stripe.Customer:
expected_metadata = _get_metadata_for_stripe_customer(makeradmin_member)
customer = retry(
lambda: stripe.Customer.create(
description=f"Created by Makeradmin (#{makeradmin_member.member_number})",
email=makeradmin_member.email.strip(),
name=f"{makeradmin_member.firstname} {makeradmin_member.lastname}",
metadata=expected_metadata,
test_clock=test_clock,
)
)
return customer


def get_or_create_stripe_customer(
makeradmin_member: Member, test_clock: Optional[stripe.test_helpers.TestClock] = None
) -> stripe.Customer:
if makeradmin_member.stripe_customer_id:
stripe_customer = get_stripe_customer(makeradmin_member)
if stripe_customer is None:
stripe_customer = _create_stripe_customer(makeradmin_member, test_clock)
return stripe_customer


def get_and_sync_stripe_customer(
makeradmin_member: Member, test_clock: Optional[stripe.test_helpers.TestClock] = None
) -> stripe.Customer:
try:
stripe_customer = get_or_create_stripe_customer(makeradmin_member, test_clock)
if not eq_makeradmin_stripe_customer(makeradmin_member, stripe_customer):
stripe_customer = update_stripe_customer(makeradmin_member)

# Note: Stripe doesn't update its search index of customers immediately,
# so the new customer may not be visible to stripe.Customer.search for a few seconds.
# Therefore we always try to find the customer by its ID, that we store in the database
makeradmin_member.stripe_customer_id = stripe_customer.stripe_id
db_session.flush()
return stripe_customer
except Exception as e:
raise InternalServerError(f"Failed to sync stripe customer for member {makeradmin_member}. Exception {e}")


def update_stripe_customer(makeradmin_member: Member) -> stripe.Customer:
"""Update the stripe product to match the makeradmin product"""
expected_metadata = _get_metadata_for_stripe_customer(makeradmin_member)
return retry(
lambda: stripe.Customer.modify(
makeradmin_member.stripe_customer_id,
description=f"Created by Makeradmin (#{makeradmin_member.member_number})",
email=makeradmin_member.email.strip(),
name=f"{makeradmin_member.firstname} {makeradmin_member.lastname}",
metadata=expected_metadata,
)
)


def delete_stripe_customer(member_id: int) -> None:
member = db_session.query(Member).get(member_id)
if member is None:
raise NotFound(f"Unable to find member with id {member_id}")
if member.stripe_customer_id:
# Note: This will also delete all subscriptions
stripe.Customer.delete(member.stripe_customer_id)

member.stripe_customer_id = None
db_session.flush()
8 changes: 1 addition & 7 deletions api/src/shop/stripe_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

from datetime import datetime, timezone, date, time, timedelta
from stripe.error import InvalidRequestError
from shop.stripe_util import retry
from shop.stripe_util import retry, are_metadata_dicts_equivalent
from basic_types.enums import PriceLevel
from shop.stripe_discounts import get_discount_for_product, get_price_level_for_member
from shop.models import Product, ProductAction, ProductCategory
Expand Down Expand Up @@ -232,12 +232,6 @@ def delete_stripe_customer(member_id: int) -> None:
db_session.flush()


def are_metadata_dicts_equivalent(a: Dict[str, Any], b: Dict[str, Any]) -> bool:
a = {k: v for k, v in a.items() if v != ""}
b = {k: v for k, v in b.items() if v != ""}
return a == b


def get_stripe_customer(
member_info: Member, test_clock: Optional[stripe.test_helpers.TestClock]
) -> Optional[stripe.Customer]:
Expand Down
6 changes: 6 additions & 0 deletions api/src/shop/stripe_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@
}


def are_metadata_dicts_equivalent(a: Dict[str, Any], b: Dict[str, Any]) -> bool:
a = {k: v for k, v in a.items() if v != ""}
b = {k: v for k, v in b.items() if v != ""}
return a == b


def get_subscription_category() -> ProductCategory:
offset = 0
while True:
Expand Down
64 changes: 64 additions & 0 deletions api/src/shop/test/stripe_customer_test
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from logging import getLogger
from typing import Any, Dict, List
from unittest import skipIf

import membership.models
import shop.models
import messages.models
import core.models
from shop.stripe_customer import (
get_or_create_stripe_customer,
get_and_sync_stripe_customer,
delete_stripe_customer,
eq_makeradmin_stripe_customer,
)
from shop.stripe_util import are_metadata_dicts_equivalent
from shop.stripe_constants import (
MakerspaceMetadataKeys as MSMetaKeys,
)
from membership.models import Member
import stripe
from test_aid.test_base import FlaskTestBase, ShopTestMixin

logger = getLogger("makeradmin")


class StripeCustomerTest(ShopTestMixin, FlaskTestBase):
# The products id in makeradmin have to be unique in each test to prevent race conditions
# Some of the tests here will generate new objects in stripe. They are all ran in test mode
# You can clear the test area in stripe's developer dashboard.

models = [membership.models, messages.models, shop.models, core.models]

@skipIf(not stripe.api_key, "stripe util tests require stripe api key in .env file")
def setUp(self) -> None:
self.seen_members: List[Member] = []

def tearDown(self) -> None:
# It is not possible to delete prices through the api so we set them as inactive instead
for makeradmin_member in self.seen_members:
delete_stripe_customer(makeradmin_member.member_id)
return super().tearDown()

@staticmethod
def assertCustomer(makeradmin_member: Member, stripe_test_customer: stripe.Customer) -> None:
assert stripe_test_customer
assert stripe_test_customer.email == makeradmin_member.email.strip()
expected_metadata = {
# Setting to an empty string will delete the key if present
MSMetaKeys.PENDING_MEMBER.value: "pending" if makeradmin_member.pending_activation else "",
MSMetaKeys.USER_ID.value: makeradmin_member.member_id,
MSMetaKeys.MEMBER_NUMBER.value: makeradmin_member.member_number,
}
assert are_metadata_dicts_equivalent(stripe_test_customer.metadata, expected_metadata)

def test_create_customer(self) -> None:
makeradmin_test_member = self.db.create_member(
first_name="test customer simple",
email="[email protected]",
)
self.seen_members.append(makeradmin_test_member)
stripe_test_customer = get_or_create_stripe_customer(makeradmin_test_member)
self.assertCustomer(makeradmin_test_member, stripe_test_customer)

# TODO more tests

0 comments on commit 498a6b1

Please sign in to comment.