Skip to content

Commit

Permalink
atproto: start on polling notifications
Browse files Browse the repository at this point in the history
lots more to do here, 8 new TODOs in this commit 😆
  • Loading branch information
snarfed committed Sep 14, 2023
1 parent afda648 commit d3b3ff4
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 10 deletions.
55 changes: 49 additions & 6 deletions atproto.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@
https://atproto.com/
"""
import json
import itertools
import logging
import os
import re

from arroba import did
from arroba.datastore_storage import DatastoreStorage
from arroba.datastore_storage import AtpRepo, DatastoreStorage
from arroba.repo import Repo, Write
from arroba.storage import Action, CommitData
from arroba.util import next_tid, parse_at_uri
from flask import abort, g, request
from google.cloud import ndb
from granary import as1, bluesky
from lexrpc import Client
from oauth_dropins.webutil import flask_util, util
from oauth_dropins.webutil import util
import requests
from urllib.parse import urljoin, urlparse

import common
from common import (
Expand All @@ -26,8 +26,9 @@
error,
USER_AGENT,
)
from flask_app import app, cache
from models import Follower, Object, PROTOCOLS, User
import flask_app
import hub
from models import Object, PROTOCOLS, User
from protocol import Protocol

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -289,6 +290,7 @@ def fetch(cls, obj, **kwargs):
client = Client(pds, headers={'User-Agent': USER_AGENT})
obj.bsky = client.com.atproto.repo.getRecord(
repo=repo, collection=collection, rkey=rkey)
# TODO: verify sig?
return True

@classmethod
Expand All @@ -301,3 +303,44 @@ def serve(cls, obj):
/convert/... HTTP requests, so this should never be used in practice.
"""
return bluesky.from_as1(obj.as1), {'Content-Type': 'application/json'}


@hub.app.get('/_ah/queue/atproto-poll-notifs')
def poll_notifications():
"""Fetches and enqueueus new activities from the AppView for our users.
Uses the `listNotifications` endpoint, which is intended more for end users. 🤷
"""
repo_dids = [key.id() for key in AtpRepo.query().iter(keys_only=True)]
logger.info(f'Got {len(repo_dids)} repo DIDs')

users = itertools.chain(*(cls.query(cls.atproto_did.IN(repo_dids))
for cls in set(PROTOCOLS.values())
if cls and cls != ATProto))

# TODO: convert to Session for connection pipelining!
client = Client(f'https://{os.environ["APPVIEW_HOST"]}',
headers={'User-Agent': USER_AGENT})

for user in users:
# TODO: store and use cursor
# TODO: user JWT
resp = client.app.bsky.notification.listNotifications()
for notif in resp['notifications']:
# TODO: load Object with id notif.uri, skip if it already exists?
# transactional with enqueueing receive task
logger.info(f'Got {notif["reason"]} from {notif["author"]["handle"]} {notif["uri"]} {notif["cid"]}')

# TODO: verify sig
# TODO: if notif.uri has handle, resolve and replace with DID
# ...but that's probably unlikely, at least coming from AppView?
obj = Object.get_or_create(id=notif['uri'], bsky=notif['record'],
source_protocol=ATProto.ABBREV)
if not obj.status:
obj.status = 'new'
add(obj.notify, user.key)
obj.put()

common.create_task(queue='receive', key=obj.key.urlsafe())

return 'OK'
7 changes: 5 additions & 2 deletions hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
)
import requests

import atproto # atproto-poll-notifs task handler
from common import USER_AGENT

util.set_user_agent(USER_AGENT)
import models

logger = logging.getLogger(__name__)

util.set_user_agent(USER_AGENT)

models.reset_protocol_properties()

#
# Flask app
Expand Down
2 changes: 1 addition & 1 deletion models.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ class Object(StringIdModel):
domains = ndb.StringProperty(repeated=True)

status = ndb.StringProperty(choices=STATUSES)
# choices is populated in flask_app, after all User subclasses are created,
# choices is populated in app, after all User subclasses are created,
# so that PROTOCOLS is fully populated
# TODO: remove? is this redundant with the protocol-specific data fields below?
source_protocol = ndb.StringProperty(choices=[])
Expand Down
116 changes: 115 additions & 1 deletion tests/test_atproto.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import protocol
from .testutil import Fake, TestCase

from hub import app

DID_DOC = {
'id': 'did:plc:foo',
'alsoKnownAs': ['at://han.dull'],
Expand All @@ -46,6 +48,9 @@
}],
}

KEY = arroba.util.new_key()


class ATProtoTest(TestCase):

def setUp(self):
Expand Down Expand Up @@ -294,7 +299,7 @@ def test_send_existing_repo(self, mock_create_task):
did_doc = copy.deepcopy(DID_DOC)
did_doc['service'][0]['serviceEndpoint'] = 'http://localhost/'
self.store_object(id='did:plc:foo', raw=did_doc)
Repo.create(self.storage, 'did:plc:foo', signing_key=arroba.util.new_key())
Repo.create(self.storage, 'did:plc:foo', signing_key=KEY)

obj = self.store_object(id='fake:post', source_protocol='fake', our_as1={
**POST_AS,
Expand Down Expand Up @@ -329,3 +334,112 @@ def test_send_did_doc_not_our_repo(self, mock_create_task):
self.assertEqual(0, AtpBlock.query().count())
self.assertEqual(0, AtpRepo.query().count())
mock_create_task.assert_not_called()

@patch.object(tasks_client, 'create_task', return_value=Task(name='my task'))
@patch('requests.get')
def test_poll_notifications(self, mock_get, mock_create_task):
self.make_user(id='fake:user-a', cls=Fake, atproto_did=f'did:plc:a')
self.make_user(id='fake:user-c', cls=Fake, atproto_did=f'did:plc:b')
self.make_user(id='fake:user-b', cls=Fake, atproto_did=f'did:plc:c')

Repo.create(self.storage, 'did:plc:a', signing_key=KEY)
Repo.create(self.storage, 'did:plc:c', signing_key=KEY)

like = {
'$type': 'app.bsky.feed.like',
'subject': {
'cid': '...',
'uri': 'at://did:plc:a/app.bsky.feed.post/999',
},
}
reply = {
'$type': 'app.bsky.feed.post',
'text': 'I hereby reply',
'reply': {
'root': {
'cid': '...',
'uri': 'at://did:plc:a/app.bsky.feed.post/987',
},
'parent': {
'cid': '...',
'uri': 'at://did:plc:a/app.bsky.feed.post/987',
}
},
}
follow = {
'$type': 'app.bsky.graph.follow',
'subject': 'did:plc:c',
}
eve = {
'$type': 'app.bsky.actor.defs#profileView',
'did': 'did:plc:eve',
'handle': 'eve.com',
}
alice = {
'$type': 'app.bsky.actor.defs#profileView',
'did': 'did:plc:a',
'handle': 'alice',
}

mock_get.side_effect = [
requests_response({
'cursor': '...',
'notifications': [{
# TODO
# 'uri': 'at://did:plc:d/app.bsky.feed.like/123',
# 'cid': '...',
# 'author': eve,
# 'record': like,
# 'reason': 'like',
# }, {
'uri': 'at://did:plc:d/app.bsky.feed.post/456',
'cid': '...',
'author': eve,
'record': reply,
'reason': 'reply',
}],
}),
requests_response({
'cursor': '...',
'notifications': [{
'uri': 'at://did:plc:d/app.bsky.graph.follow/789',
'cid': '...',
'author': alice,
'record': follow,
'reason': 'follow',
}],
}),
]

client = app.test_client()
resp = client.get('/_ah/queue/atproto-poll-notifs')
self.assertEqual(200, resp.status_code)

self.assertEqual([call(
'https://api.bsky-sandbox.dev/xrpc/app.bsky.notification.listNotifications',
json=None,
headers={
'Content-Type': 'application/json',
'User-Agent': common.USER_AGENT,
},
)] * 2, mock_get.call_args_list)

# TODO: to convert like back to AS1, we need some mapping from the
# original post's URI/CID to its original non-ATP URL, right? add a new
# AS1 field? store it in datastore?
# ANSWER: add `copies` repeated Target property to Object to map
#
# like_obj = Object.get_by_id('at://did:plc:d/app.bsky.feed.like/123')
# self.assertEqual(like, like_obj.bsky)
# self.assert_task(mock_create_task, 'receive', '/_ah/queue/receive',
# key=like.key.urlsafe())

reply_obj = Object.get_by_id('at://did:plc:d/app.bsky.feed.post/456')
self.assertEqual(reply, reply_obj.bsky)
self.assert_task(mock_create_task, 'receive', '/_ah/queue/receive',
key=reply_obj.key.urlsafe())

follow_obj = Object.get_by_id('at://did:plc:d/app.bsky.graph.follow/789')
self.assertEqual(follow, follow_obj.bsky)
self.assert_task(mock_create_task, 'receive', '/_ah/queue/receive',
key=follow_obj.key.urlsafe())

0 comments on commit d3b3ff4

Please sign in to comment.