From 7c35fe4b1b2305c1942bd523eea4aa09ab1a46a6 Mon Sep 17 00:00:00 2001 From: Ryan Barrett Date: Mon, 18 Sep 2023 15:04:00 -0700 Subject: [PATCH] atproto.poll_notifications: generate and send AppView JWT --- atproto.py | 22 ++++++++++++++-------- tests/test_atproto.py | 3 +++ 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/atproto.py b/atproto.py index c88e996c..8088bf45 100644 --- a/atproto.py +++ b/atproto.py @@ -11,7 +11,7 @@ from arroba.datastore_storage import AtpRepo, DatastoreStorage from arroba.repo import Repo, Write from arroba.storage import Action, CommitData -from arroba.util import next_tid, parse_at_uri +from arroba.util import next_tid, parse_at_uri, service_jwt from flask import abort, g, request from google.cloud import ndb from granary import as1, bluesky @@ -318,12 +318,13 @@ def serve(cls, obj): def poll_notifications(): """Fetches and enqueueus new activities from the AppView for our users. - Uses the `listNotifications` endpoint, which is intended more for end users. 🤷 + Uses the `listNotifications` endpoint, which is intended for end users. 🤷 """ - repo_dids = [key.id() for key in AtpRepo.query().iter(keys_only=True)] - logger.info(f'Got {len(repo_dids)} repo DIDs') + repos = {r.key.id(): r for r in AtpRepo.query()} + logger.info(f'Got {len(repos)} repos') - users = itertools.chain(*(cls.query(cls.atproto_did.IN(repo_dids)) + repo_dids = [] + users = itertools.chain(*(cls.query(cls.atproto_did.IN(list(repos))) for cls in set(PROTOCOLS.values()) if cls and cls != ATProto)) @@ -333,11 +334,12 @@ def poll_notifications(): for user in users: # TODO: store and use cursor - # TODO: user JWT + repo = repos[user.atproto_did] + client.access_token = service_jwt(os.environ['APPVIEW_HOST'], + repo_did=user.atproto_did, + privkey=repo.signing_key) resp = client.app.bsky.notification.listNotifications() for notif in resp['notifications']: - # TODO: load Object with id notif.uri, skip if it already exists? - # transactional with enqueueing receive task logger.info(f'Got {notif["reason"]} from {notif["author"]["handle"]} {notif["uri"]} {notif["cid"]}') # TODO: verify sig @@ -348,6 +350,10 @@ def poll_notifications(): add(obj.notify, user.key) obj.put() + # TODO: do we need to Object.load() the source user (actor) here? or + # does Protocol.receive() (or later, eg send/serve) do that + # automatically? + common.create_task(queue='receive', key=obj.key.urlsafe()) return 'OK' diff --git a/tests/test_atproto.py b/tests/test_atproto.py index 645ed595..08c3b579 100644 --- a/tests/test_atproto.py +++ b/tests/test_atproto.py @@ -427,6 +427,9 @@ def test_poll_notifications(self, mock_get, mock_create_task): resp = client.get('/_ah/queue/atproto-poll-notifs') self.assertEqual(200, resp.status_code) + # just check that access token was set, then remove it before comparing + for call in mock_get.call_args_list: + assert call.kwargs['headers'].pop('Authorization') self.assertEqual([call( 'https://api.bsky-sandbox.dev/xrpc/app.bsky.notification.listNotifications', json=None,