Skip to content

Commit

Permalink
Merge pull request #45 from rheinwerk-verlag/feature/contact_endpoints
Browse files Browse the repository at this point in the history
Contact endpoints and tests
  • Loading branch information
jpetrucciani authored Jul 1, 2019
2 parents 992ee03 + b8e1c34 commit f52da5a
Show file tree
Hide file tree
Showing 5 changed files with 392 additions and 178 deletions.
4 changes: 2 additions & 2 deletions hubspot3/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,13 @@ def _execute_request(self, conn, request):

def _digest_result(self, data):
if data:
if isinstance(data, bytes):
data = utils.force_utf8(data)
if isinstance(data, str):
try:
data = json.loads(data)
except ValueError:
pass
elif isinstance(data, bytes):
data = json.loads(utils.force_utf8(data))

return data

Expand Down
147 changes: 104 additions & 43 deletions hubspot3/contacts.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
"""
hubspot contacts api
"""
import warnings
from typing import Union

from hubspot3 import logging_helper
from hubspot3.base import BaseClient
from hubspot3.utils import prettify
from typing import Union


CONTACTS_API_VERSION = "1"

Expand All @@ -30,30 +31,35 @@ def __init__(self, *args, **kwargs):
def _get_path(self, subpath):
return "contacts/v{}/{}".format(CONTACTS_API_VERSION, subpath)

def create_or_update_a_contact(self, email, data=None, **options):
"""Creates or Updates a client with the supplied data."""
data = data or {}
def get_by_id(self, contact_id: str, **options):
"""Get contact specified by ID"""
return self._call(
"contact/createOrUpdate/email/{email}".format(email=email),
data=data,
method="POST",
**options
"contact/vid/{}/profile".format(contact_id), method="GET", **options
)

def get_contact_by_email(self, email, **options):
"""Gets contact specified by email address."""
def get_by_email(self, email, **options):
"""Get contact specified by email address."""
return self._call(
"contact/email/{email}/profile".format(email=email), method="GET", **options
)

def get_contact_by_id(self, contact_id: str, **options):
"""Gets contact specified by ID"""
def create(self, data=None, **options):
"""create a contact"""
data = data or {}
return self._call("contact", data=data, method="POST", **options)

def create_or_update_by_email(self, email, data=None, **options):
"""Create or Updates a client with the supplied data."""
data = data or {}
return self._call(
"contact/vid/{}/profile".format(contact_id), method="GET", **options
"contact/createOrUpdate/email/{email}".format(email=email),
data=data,
method="POST",
**options
)

def update_a_contact(self, contact_id, data=None, **options):
"""Updates the contact by contact_id with the given data."""
def update_by_id(self, contact_id, data=None, **options):
"""Update the contact by contact_id with the given data."""
data = data or {}
return self._call(
"contact/vid/{contact_id}/profile".format(contact_id=contact_id),
Expand All @@ -62,47 +68,54 @@ def update_a_contact(self, contact_id, data=None, **options):
**options
)

def delete_a_contact(self, contact_id: str, **options):
"""Deletes a contact by contact_id."""
def update_by_email(self, email: str, data=None, **options):
"""update the concat for the given email address with the given data"""
data = data or {}

return self._call(
"contact/email/{}/profile".format(email),
data=data,
method="POST",
**options
)

def delete_by_id(self, contact_id: str, **options):
"""Delete a contact by contact_id."""
return self._call(
"contact/vid/{contact_id}".format(contact_id=contact_id),
method="DELETE",
**options
)

def create(self, data=None, **options):
"""create a contact"""
data = data or {}
return self._call("contact", data=data, method="POST", **options)
def merge(self, primary_id: int, secondary_id: int, **options):
"""merge the data from the secondary_id into the data of the primary_id"""
data = dict(vidToMerge=secondary_id)

def update(self, contact_id: str, data=None, **options):
"""update the given vid with the given data"""
if not data:
data = {}

return self._call(
"contact/vid/{}/profile".format(contact_id),
self._call(
"contact/merge-vids/{}/".format(primary_id),
data=data,
method="POST",
**options
)

default_batch_properties = [
"email",
"firstname",
"lastname",
"company",
"website",
"phone",
"address",
"city",
"state",
"zip",
"associatedcompanyid",
]

def get_batch(self, ids, extra_properties: Union[list, str] = None):
"""given a batch of vids, get more of their info"""
# default properties to fetch
properties = [
"email",
"firstname",
"lastname",
"company",
"website",
"phone",
"address",
"city",
"state",
"zip",
"associatedcompanyid",
]
properties = self.default_batch_properties

# append extras if they exist
if extra_properties:
Expand Down Expand Up @@ -163,7 +176,7 @@ def _get_recent(
**options
):
"""
returns a list of either recently created or recently modified/created contacts
return a list of either recently created or recently modified/created contacts
"""
finished = False
output = []
Expand Down Expand Up @@ -209,3 +222,51 @@ def get_recently_modified(self, limit: int = 100):
:see: https://developers.hubspot.com/docs/methods/contacts/get_recently_updated_contacts
"""
return self._get_recent(ContactsClient.Recency.MODIFIED, limit=limit)

def get_contact_by_id(self, contact_id: str, **options):
warnings.warn(
"ContactsClient.get_contact_by_id is deprecated in favor of "
"ContactsClient.get_by_id",
DeprecationWarning,
)
return self.get_by_id(contact_id, **options)

def get_contact_by_email(self, email, **options):
warnings.warn(
"ContactsClient.get_contact_by_email is deprecated in favor of "
"ContactsClient.get_by_email",
DeprecationWarning,
)
return self.get_by_email(email, **options)

def create_or_update_a_contact(self, email, data=None, **options):
warnings.warn(
"ContactsClient.create_or_update_a_contact is deprecated in favor of "
"ContactsClient.create_or_update_by_email",
DeprecationWarning,
)
return self.create_or_update_by_email(email, data, **options)

def update(self, contact_id: str, data=None, **options):
warnings.warn(
"ContactsClient.update is deprecated in favor of "
"ContactsClient.update_by_id",
DeprecationWarning,
)
return self.update_by_id(contact_id, data, **options)

def update_a_contact(self, contact_id, data=None, **options):
warnings.warn(
"ContactsClient.update_a_contact is deprecated in favor of "
"ContactsClient.update_by_id",
DeprecationWarning,
)
return self.update_by_id(contact_id, data, **options)

def delete_a_contact(self, contact_id: str, **options):
warnings.warn(
"ContactsClient.delete_a_contact is deprecated in favor of "
"ContactsClient.delete_by_id",
DeprecationWarning,
)
return self.delete_by_id(contact_id, **options)
42 changes: 15 additions & 27 deletions hubspot3/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,29 @@ def assert_num_requests(number):

connection.assert_num_requests = assert_num_requests

def assert_has_request(method, url, data=None):
def assert_has_request(method, url, data=None, **params):
"""
Assert that at least one request with the exact combination of method, URL and body data
was performed.
Assert that at least one request with the exact combination of method, URL, body data, and
query parameters was performed.
"""
data = json.dumps(data) if data else None
for args, kwargs in connection.request.call_args_list:
if args[0] == method and args[1] == url and args[2] == data:
break
else:
raise AssertionError(
"No {method} request to URL '{url}' with data '{data}' was performed.'".format(
method=method, url=url, data=data
)
)

connection.assert_has_request = assert_has_request

def assert_query_parameters_in_request(**params):
"""
Assert that at least one request using all of the given query parameters was performed.
"""
for args, kwargs in connection.request.call_args_list:
url = args[1]
if all(
urlencode({name: value}, doseq=True) in url
url_check = args[1] == url if not params else args[1].startswith(url)
params_check = all(
urlencode({name: value}, doseq=True) in args[1]
for name, value in params.items()
):
)
if args[0] == method and url_check and args[2] == data and params_check:
break
else:
raise AssertionError(
"No request contains all given query parameters: {}".format(params)
(
"No {method} request to URL '{url}' with data '{data}' and with parameters "
"'{params}' was performed.'"
).format(method=method, url=url, data=data, params=params)
)

connection.assert_query_parameters_in_request = assert_query_parameters_in_request
connection.assert_has_request = assert_has_request

def set_response(status_code, body):
"""Set the response status code and body for all mocked requests."""
Expand All @@ -70,8 +58,8 @@ def set_response(status_code, body):
def set_responses(response_tuples):
"""
Set multiple responses for consecutive mocked requests via tuples of (status code, body).
The first request will result in the first response, the second request in the second
response and so on.
The first request will result in the first response, the second request in the
second response and so on.
"""
responses = []
for status_code, body in response_tuples:
Expand Down
Loading

0 comments on commit f52da5a

Please sign in to comment.