Skip to content

Commit

Permalink
atproto.poll_notifications: generate and send AppView JWT
Browse files Browse the repository at this point in the history
  • Loading branch information
snarfed committed Sep 18, 2023
1 parent dcbf16d commit 7c35fe4
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 8 deletions.
22 changes: 14 additions & 8 deletions atproto.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from arroba.datastore_storage import AtpRepo, DatastoreStorage
from arroba.repo import Repo, Write
from arroba.storage import Action, CommitData
from arroba.util import next_tid, parse_at_uri
from arroba.util import next_tid, parse_at_uri, service_jwt
from flask import abort, g, request
from google.cloud import ndb
from granary import as1, bluesky
Expand Down Expand Up @@ -318,12 +318,13 @@ def serve(cls, obj):
def poll_notifications():
"""Fetches and enqueueus new activities from the AppView for our users.
Uses the `listNotifications` endpoint, which is intended more for end users. 🤷
Uses the `listNotifications` endpoint, which is intended for end users. 🤷
"""
repo_dids = [key.id() for key in AtpRepo.query().iter(keys_only=True)]
logger.info(f'Got {len(repo_dids)} repo DIDs')
repos = {r.key.id(): r for r in AtpRepo.query()}
logger.info(f'Got {len(repos)} repos')

users = itertools.chain(*(cls.query(cls.atproto_did.IN(repo_dids))
repo_dids = []
users = itertools.chain(*(cls.query(cls.atproto_did.IN(list(repos)))
for cls in set(PROTOCOLS.values())
if cls and cls != ATProto))

Expand All @@ -333,11 +334,12 @@ def poll_notifications():

for user in users:
# TODO: store and use cursor
# TODO: user JWT
repo = repos[user.atproto_did]
client.access_token = service_jwt(os.environ['APPVIEW_HOST'],
repo_did=user.atproto_did,
privkey=repo.signing_key)
resp = client.app.bsky.notification.listNotifications()
for notif in resp['notifications']:
# TODO: load Object with id notif.uri, skip if it already exists?
# transactional with enqueueing receive task
logger.info(f'Got {notif["reason"]} from {notif["author"]["handle"]} {notif["uri"]} {notif["cid"]}')

# TODO: verify sig
Expand All @@ -348,6 +350,10 @@ def poll_notifications():
add(obj.notify, user.key)
obj.put()

# TODO: do we need to Object.load() the source user (actor) here? or
# does Protocol.receive() (or later, eg send/serve) do that
# automatically?

common.create_task(queue='receive', key=obj.key.urlsafe())

return 'OK'
3 changes: 3 additions & 0 deletions tests/test_atproto.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,9 @@ def test_poll_notifications(self, mock_get, mock_create_task):
resp = client.get('/_ah/queue/atproto-poll-notifs')
self.assertEqual(200, resp.status_code)

# just check that access token was set, then remove it before comparing
for call in mock_get.call_args_list:
assert call.kwargs['headers'].pop('Authorization')
self.assertEqual([call(
'https://api.bsky-sandbox.dev/xrpc/app.bsky.notification.listNotifications',
json=None,
Expand Down

0 comments on commit 7c35fe4

Please sign in to comment.