Skip to content

Commit

Permalink
client: add subscription support
Browse files Browse the repository at this point in the history
  • Loading branch information
snarfed committed Aug 9, 2023
1 parent 109b8d8 commit 19cd0ef
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 38 deletions.
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ output = client.com.example.my_query({'foo': 'bar'}, param_a=5)

Note that `-` characters in method NSIDs are converted to `_`s, eg the call above is for the method `com.example.my-query`.

[Event stream methods with type `subscription`](https://atproto.com/specs/event-stream) are generators that `yield` messages sent by the server. They take parameters as kwargs, but no positional `input`.

```
for msg in client.com.example.count(start=1, end=10):
print(msg['num'])
```


## Server

Expand Down Expand Up @@ -61,7 +68,7 @@ You can also register a method handler with [`Server.register`](https://lexrpc.r
server.register('com.example.my-query', my_query_handler)
```

[Event stream methods with type `subscription`](https://atproto.com/specs/event-stream) are generators that `yield` messages to send to the client. They take parameters as kwargs, but no positional `input`.
[Event stream methods with type `subscription`](https://atproto.com/specs/event-stream) should be generators that `yield` messages to send to the client. They take parameters as kwargs, but no positional `input`.

```
@server.method('com.example.count')
Expand Down Expand Up @@ -94,7 +101,6 @@ This configures the Flask app to serve the methods registered with the lexrpc se
* support record types, eg via type "ref" and ref field pointing to the nsid [example here](https://github.com/bluesky-social/atproto/blob/main/lexicons/app/bsky/graph/follow.json#L13), ref points to [`app.bsky.actor.ref`](https://github.com/bluesky-social/atproto/blob/main/lexicons/app/bsky/actor/ref.json). ref isn't documented yet though, and these lexicons also use a `defs` field, which isn't really documented either. [they plan to update the docs and specs soon.](https://github.com/bluesky-social/atproto/pull/409#issuecomment-1348766856)
* check out [atproto@63b9873bb1699b6bce54e7a8d3db2fcbd2cfc5ab](https://github.com/snarfed/atproto/commit/63b9873bb1699b6bce54e7a8d3db2fcbd2cfc5ab)!
* [extensions](https://atproto.com/guides/lexicon#extensibility). is there anything to do? ah, [they're currently TODO in the spec](https://atproto.com/specs/xrpc#todos).
* ["binary blob" support.](https://atproto.com/specs/xrpc) currently undefined ish? is it based on the `encoding` field?
* [authentication, currently TODO in the spec](https://atproto.com/specs/xrpc#todos)


Expand Down
65 changes: 46 additions & 19 deletions lexrpc/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
"""XRPC client implementation."""
"""XRPC client implementation.
TODO:
* asyncio support for subscription websockets
"""
import json
import logging

import requests
import simple_websocket

from .base import Base, NSID_SEGMENT_RE

Expand Down Expand Up @@ -64,15 +70,18 @@ def __getattr__(self, attr):

return getattr(super(), attr)

def call(self, nsid, input, **params):
def call(self, nsid, input=None, **params):
"""Makes a remote XRPC method call.
Args:
nsid: str, method NSID
input: dict, input body
input: dict, input body, optional for subscriptions
params: optional method parameters
Returns: decoded JSON object, or None if the method has no output
Returns:
For queries and procedures: decoded JSON object, or None if the method
has no output
For subscriptions: generator iterator of messages from server
Raises:
NotImplementedError
Expand All @@ -89,7 +98,9 @@ def call(self, nsid, input, **params):
self._maybe_validate(nsid, 'parameters', params)
params = self.encode_params(params)

self._maybe_validate(nsid, 'input', input)
type = self._get_def(nsid)['type']
if type == 'subscription':
self._maybe_validate(nsid, 'input', input)

headers = {
**self._headers,
Expand All @@ -98,17 +109,33 @@ def call(self, nsid, input, **params):

# run method
url = f'{self._address}/xrpc/{nsid}'
defn = self._get_def(nsid)
fn = requests.get if defn['type'] == 'query' else requests.post
logger.debug(f'Running {fn} {url} {input} {params} {headers}')
resp = fn(url, params=params, json=input if input else None, headers=headers)
logger.debug(f'Got: {resp}')
resp.raise_for_status()

output = None
content_type = resp.headers.get('Content-Type', '').split(';')[0]
if content_type == 'application/json' and resp.content:
output = resp.json()

self._maybe_validate(nsid, 'output', output)
return output
if params:
url += f'?{params}'

if type == 'subscription':
return self._subscribe(url)
else:
# query or procedure
fn = requests.get if type == 'query' else requests.post
logger.debug(f'Running {fn} {url} {input} {params} {headers}')
resp = fn(url, json=input if input else None, headers=headers)
logger.debug(f'Got: {resp}')
resp.raise_for_status()

output = None
content_type = resp.headers.get('Content-Type', '').split(';')[0]
if content_type == 'application/json' and resp.content:
output = resp.json()

self._maybe_validate(nsid, 'output', output)
return output

def _subscribe(self, url):
"""Connects to a subscription websocket, yields the returned messages."""
ws = simple_websocket.Client(url)

try:
while True:
yield json.loads(ws.receive())
except simple_websocket.ConnectionClosed as cc:
logger.debug(cc)
57 changes: 40 additions & 17 deletions lexrpc/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from jsonschema import ValidationError
import requests
import simple_websocket

from .lexicons import LEXICONS
from .. import Client
Expand All @@ -28,12 +29,32 @@ def response(body=None, status=200, headers=None):
return resp


class FakeWebsocketClient:
"""Fake of :class:`simple_websocket.Client`."""

def __init__(self, url):
FakeWebsocketClient.url = url

def send(self, msg):
self.sent.append(json.loads(msg))

def receive(self):
if not self.to_receive:
raise simple_websocket.ConnectionClosed(message='foo')

return json.dumps(self.to_receive.pop(0))


class ClientTest(TestCase):
maxDiff = None

def setUp(self):
self.client = Client('http://ser.ver', LEXICONS)

simple_websocket.Client = FakeWebsocketClient
FakeWebsocketClient.sent = []
FakeWebsocketClient.to_receive = []

@patch('requests.get')
def test_call(self, mock_get):
output = {'foo': 'asdf', 'bar': 3}
Expand All @@ -43,8 +64,7 @@ def test_call(self, mock_get):
self.assertEqual(output, got)

mock_get.assert_called_once_with(
'http://ser.ver/xrpc/io.example.query',
params='x=y',
'http://ser.ver/xrpc/io.example.query?x=y',
json=None,
headers={'Content-Type': 'application/json'},
)
Expand All @@ -58,8 +78,7 @@ def test_query(self, mock_get):
self.assertEqual(output, got)

mock_get.assert_called_once_with(
'http://ser.ver/xrpc/io.example.query',
params='x=y',
'http://ser.ver/xrpc/io.example.query?x=y',
json=None,
headers={'Content-Type': 'application/json'},
)
Expand All @@ -74,8 +93,7 @@ def test_procedure(self, mock_post):
self.assertEqual(output, got)

mock_post.assert_called_once_with(
'http://ser.ver/xrpc/io.example.procedure',
params='x=y',
'http://ser.ver/xrpc/io.example.procedure?x=y',
json=input,
headers={'Content-Type': 'application/json'},
)
Expand All @@ -89,8 +107,7 @@ def test_boolean_param(self, mock_get):
self.assertEqual(output, got)

mock_get.assert_called_once_with(
'http://ser.ver/xrpc/io.example.query',
params='z=true',
'http://ser.ver/xrpc/io.example.query?z=true',
json=None,
headers={'Content-Type': 'application/json'},
)
Expand All @@ -106,7 +123,6 @@ def test_no_output_error(self, mock_get):

mock_get.assert_called_once_with(
'http://ser.ver/xrpc/io.example.query',
params='',
json=None,
headers={'Content-Type': 'application/json'},
)
Expand All @@ -118,7 +134,6 @@ def test_no_params_input_output(self, mock_post):

mock_post.assert_called_once_with(
'http://ser.ver/xrpc/io.example.noParamsInputOutput',
params='',
json=None,
headers={'Content-Type': 'application/json'},
)
Expand All @@ -130,7 +145,6 @@ def test_dashed_name(self, mock_post):

mock_post.assert_called_once_with(
'http://ser.ver/xrpc/io.example.dashed-name',
params='',
json=None,
headers={'Content-Type': 'application/json'},
)
Expand All @@ -143,7 +157,6 @@ def test_defs(self, mock_get):

mock_get.assert_called_once_with(
'http://ser.ver/xrpc/io.example.defs',
params='',
json={'in': 'bar'},
headers={'Content-Type': 'application/json'},
)
Expand All @@ -170,12 +183,24 @@ def test_array(self, mock_post):
self.assertEqual(['z'], self.client.io.example.array({}, foo=['a', 'b']))

mock_post.assert_called_once_with(
'http://ser.ver/xrpc/io.example.array',
params='foo=a&foo=b',
'http://ser.ver/xrpc/io.example.array?foo=a&foo=b',
json=None,
headers={'Content-Type': 'application/json'},
)

def test_subscription(self):
msgs = [
{'num': 3},
{'num': 4},
{'num': 5},
]
FakeWebsocketClient.to_receive = list(msgs)

gen = self.client.io.example.subscribe(start=3, end=6)
self.assertEqual(msgs, list(gen))
self.assertEqual('http://ser.ver/xrpc/io.example.subscribe?start=3&end=6',
FakeWebsocketClient.url)

@patch('requests.post')
def test_validate_false(self, mock_post):
client = Client('http://ser.ver', LEXICONS, validate=False)
Expand All @@ -189,7 +214,6 @@ def test_validate_false(self, mock_post):

mock_post.assert_called_once_with(
'http://ser.ver/xrpc/io.example.procedure',
params='',
json=input,
headers={'Content-Type': 'application/json'},
)
Expand All @@ -204,8 +228,7 @@ def test_headers(self, mock_get):
self.assertEqual(output, got)

mock_get.assert_called_once_with(
'http://ser.ver/xrpc/io.example.query',
params='x=y',
'http://ser.ver/xrpc/io.example.query?x=y',
json=None,
headers={
'Content-Type': 'application/json',
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ keywords = ['XRPC', 'Lexicon', 'AT Protocol', 'ATP']
dependencies = [
'jsonschema>=4.0',
'requests>=2.0',
'simple-websocket',
]
classifiers = [
'Programming Language :: Python :: 3',
Expand Down

0 comments on commit 19cd0ef

Please sign in to comment.