Skip to content

Commit

Permalink
Refactor transport processors
Browse files Browse the repository at this point in the history
  • Loading branch information
alecmev committed Aug 10, 2022
1 parent 6ab3fca commit dd1fba1
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 102 deletions.
173 changes: 84 additions & 89 deletions plugin/core/transports.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from contextlib import closing
from functools import partial
from queue import Queue
import http
import http.client
import json
import multiprocessing.connection
import os
Expand Down Expand Up @@ -49,70 +49,99 @@ def on_stderr_message(self, message: str) -> None:

class AbstractProcessor(Generic[T]):

def write_data(self, writer: IO[bytes], data: T, is_node_ipc: bool) -> None:
def write_data(self, data: T) -> None:
raise NotImplementedError()

def read_data(self, reader: IO[bytes], is_node_ipc: bool) -> Optional[T]:
def read_data(self) -> Optional[T]:
raise NotImplementedError()


class JsonRpcProcessor(AbstractProcessor[Dict[str, Any]]):
def encode_payload(data: Dict[str, Any]) -> bytes:
return json.dumps(
data,
ensure_ascii=False,
check_circular=False,
separators=(',', ':')
).encode('utf-8')

def write_data(self, writer: IO[bytes], data: Dict[str, Any], is_node_ipc: bool) -> None:
body = self._encode(data)
if not is_node_ipc:
writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body))
else:
writer.write(body + b"\n")

def read_data(self, reader: IO[bytes], is_node_ipc: bool) -> Optional[Dict[str, Any]]:
if not is_node_ipc:
headers = http.client.parse_headers(reader) # type: ignore
try:
body = reader.read(int(headers.get("Content-Length")))
except TypeError:
# Expected error on process stopping. Stop the read loop.
raise StopLoopError()
else:
body = reader.readline()
def decode_payload(message: bytes) -> Optional[Dict[str, Any]]:
try:
return json.loads(message.decode('utf-8'))
except Exception as ex:
exception_log("JSON decode error", ex)
return None


class StandardProcessor(AbstractProcessor[Dict[str, Any]]):

def __init__(self, reader: Optional[IO[bytes]], writer: IO[bytes]):
if not reader or not writer:
raise RuntimeError('Failed initializing transport: reader: {}, writer: {}'.format(reader, writer))
self._reader = reader
self._writer = writer

def write_data(self, data: Dict[str, Any]) -> None:
body = encode_payload(data)
self._writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body))
self._writer.flush()

def read_data(self) -> Optional[Dict[str, Any]]:
headers = http.client.parse_headers(self._reader) # type: ignore
try:
return self._decode(body)
except Exception as ex:
exception_log("JSON decode error", ex)
return None

@staticmethod
def _encode(data: Dict[str, Any]) -> bytes:
return json.dumps(
data,
ensure_ascii=False,
check_circular=False,
separators=(',', ':')
).encode('utf-8')

@staticmethod
def _decode(message: bytes) -> Dict[str, Any]:
return json.loads(message.decode('utf-8'))
body = self._reader.read(int(headers.get("Content-Length")))
except TypeError:
# Expected error on process stopping. Stop the read loop.
raise StopLoopError()
return decode_payload(body)


class NodeIpcProcessor(AbstractProcessor[Dict[str, Any]]):
_buf = bytearray()
_lines = 0

def __init__(self, conn: multiprocessing.connection._ConnectionBase):
self._conn = conn

def write_data(self, data: Dict[str, Any]) -> None:
body = encode_payload(data) + b"\n"
while len(body):
n = self._conn._write(self._conn.fileno(), body) # type: ignore
body = body[n:]

def read_data(self) -> Optional[Dict[str, Any]]:
while self._lines == 0:
chunk = self._conn._read(self._conn.fileno(), 65536) # type: ignore
if len(chunk) == 0:
# EOF reached: https://docs.python.org/3/library/os.html#os.read
raise StopLoopError()

self._buf += chunk
self._lines += chunk.count(b'\n')

self._lines -= 1
message, _, self._buf = self._buf.partition(b'\n')
return decode_payload(message)


class ProcessTransport(Transport[T]):

def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket.socket], reader: IO[bytes],
writer: IO[bytes], stderr: Optional[IO[bytes]], processor: AbstractProcessor[T],
callback_object: TransportCallbacks[T], is_node_ipc: bool) -> None:
def __init__(self,
name: str,
process: subprocess.Popen,
socket: Optional[socket.socket],
stderr: Optional[IO[bytes]],
processor: AbstractProcessor[T],
callback_object: TransportCallbacks[T]) -> None:
self._closed = False
self._process = process
self._socket = socket
self._reader = reader
self._writer = writer
self._stderr = stderr
self._processor = processor
self._reader_thread = threading.Thread(target=self._read_loop, name='{}-reader'.format(name))
self._writer_thread = threading.Thread(target=self._write_loop, name='{}-writer'.format(name))
self._stderr_thread = threading.Thread(target=self._stderr_loop, name='{}-stderr'.format(name))
self._callback_object = weakref.ref(callback_object)
self._is_node_ipc = is_node_ipc
self._send_queue = Queue(0) # type: Queue[Union[T, None]]
self._reader_thread.start()
self._writer_thread.start()
Expand Down Expand Up @@ -144,8 +173,8 @@ def __del__(self) -> None:

def _read_loop(self) -> None:
try:
while self._reader:
payload = self._processor.read_data(self._reader, self._is_node_ipc)
while True:
payload = self._processor.read_data()
if payload is None:
continue

Expand Down Expand Up @@ -194,13 +223,11 @@ def invoke() -> None:
def _write_loop(self) -> None:
exception = None # type: Optional[Exception]
try:
while self._writer:
while True:
d = self._send_queue.get()
if d is None:
break
self._processor.write_data(self._writer, d, self._is_node_ipc)
if not self._is_node_ipc:
self._writer.flush()
self._processor.write_data(d)
except (BrokenPipeError, AttributeError):
pass
except Exception as ex:
Expand Down Expand Up @@ -228,35 +255,6 @@ def _stderr_loop(self) -> None:
self._send_queue.put_nowait(None)


# Can be a singleton since it doesn't hold any state.
json_rpc_processor = JsonRpcProcessor()


class NodeIpcIO():
_buf = bytearray()
_lines = 0

def __init__(self, conn: multiprocessing.connection._ConnectionBase):
self._conn = conn

# https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L378-L392
def readline(self) -> bytearray:
while self._lines == 0:
chunk = self._conn._read(self._conn.fileno(), 65536) # type: ignore
self._buf += chunk
self._lines += chunk.count(b'\n')

self._lines -= 1
line, _, self._buf = self._buf.partition(b'\n')
return line

# https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L369-L376
def write(self, data: bytes) -> None:
while len(data):
n = self._conn._write(self._conn.fileno(), data) # type: ignore
data = data[n:]


def create_transport(config: TransportConfig, cwd: Optional[str],
callback_object: TransportCallbacks) -> Transport[Dict[str, Any]]:
stderr = subprocess.PIPE
Expand Down Expand Up @@ -292,24 +290,22 @@ def start_subprocess() -> subprocess.Popen:
config.listener_socket,
start_subprocess
)
processor = StandardProcessor(reader, writer) # type: AbstractProcessor
else:
process = start_subprocess()
if config.tcp_port:
sock = _connect_tcp(config.tcp_port)
if sock is None:
raise RuntimeError("Failed to connect on port {}".format(config.tcp_port))
reader = sock.makefile('rwb') # type: ignore
writer = reader
reader = writer = sock.makefile('rwb')
processor = StandardProcessor(reader, writer)
elif not config.node_ipc:
reader = process.stdout # type: ignore
writer = process.stdin # type: ignore
processor = StandardProcessor(process.stdout, process.stdin) # type: ignore
else:
reader = writer = NodeIpcIO(config.node_ipc.parent_conn) # type: ignore
if not reader or not writer:
raise RuntimeError('Failed initializing transport: reader: {}, writer: {}'.format(reader, writer))
processor = NodeIpcProcessor(config.node_ipc.parent_conn)

stderr_reader = process.stdout if config.node_ipc else process.stderr
return ProcessTransport(config.name, process, sock, reader, writer, stderr_reader, json_rpc_processor,
callback_object, bool(config.node_ipc))
return ProcessTransport(config.name, process, sock, stderr_reader, processor, callback_object)


_subprocesses = weakref.WeakSet() # type: weakref.WeakSet[subprocess.Popen]
Expand Down Expand Up @@ -403,8 +399,7 @@ def start_in_background(d: _SubprocessData) -> None:
# Await one client connection (blocking!)
sock, _ = listener_socket.accept()
thread.join()
reader = sock.makefile('rwb') # type: IO[bytes]
writer = reader
reader = writer = sock.makefile('rwb')
assert data.process
return data.process, sock, reader, writer

Expand Down
20 changes: 10 additions & 10 deletions plugin/core/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ def map_from_remote_to_local(self, uri: str) -> Tuple[str, bool]:
return _translate_path(uri, self._remote, self._local)


NodeIpc = collections.namedtuple('NodeIpc', 'parent_conn,child_conn')
NodeIpcPipe = collections.namedtuple('NodeIpcPipe', 'parent_conn,child_conn')


class TransportConfig:
Expand All @@ -620,15 +620,10 @@ def __init__(
tcp_port: Optional[int],
env: Dict[str, str],
listener_socket: Optional[socket.socket],
node_ipc: Optional[NodeIpc]
node_ipc: Optional[NodeIpcPipe]
) -> None:
if not command and not tcp_port:
raise ValueError('neither "command" nor "tcp_port" is provided; cannot start a language server')
if node_ipc and (tcp_port or listener_socket):
raise ValueError(
'"tcp_port" and "listener_socket" can\'t be provided in "--node-ipc" mode; ' +
'cannot start a language server'
)
self.name = name
self.command = command
self.tcp_port = tcp_port
Expand All @@ -644,6 +639,7 @@ def __init__(self,
priority_selector: Optional[str] = None,
schemes: Optional[List[str]] = None,
command: Optional[List[str]] = None,
use_node_ipc: bool = False,
binary_args: Optional[List[str]] = None, # DEPRECATED
tcp_port: Optional[int] = None,
auto_complete_selector: Optional[str] = None,
Expand All @@ -668,6 +664,7 @@ def __init__(self,
else:
assert isinstance(binary_args, list)
self.command = binary_args
self.use_node_ipc = use_node_ipc
self.tcp_port = tcp_port
self.auto_complete_selector = auto_complete_selector
self.enabled = enabled
Expand Down Expand Up @@ -701,9 +698,10 @@ def from_sublime_settings(cls, name: str, s: sublime.Settings, file: str) -> "Cl
priority_selector=_read_priority_selector(s),
schemes=s.get("schemes"),
command=read_list_setting(s, "command", []),
use_node_ipc=bool(s.get("use_node_ipc", False)),
tcp_port=s.get("tcp_port"),
auto_complete_selector=s.get("auto_complete_selector"),
# Default to True, because an LSP plugin is enabled iff it is enabled as a Sublime package.
# Default to True, because an LSP plugin is enabled if it is enabled as a Sublime package.
enabled=bool(s.get("enabled", True)),
init_options=init_options,
settings=settings,
Expand Down Expand Up @@ -731,6 +729,7 @@ def from_dict(cls, name: str, d: Dict[str, Any]) -> "ClientConfig":
priority_selector=_read_priority_selector(d),
schemes=schemes,
command=d.get("command", []),
use_node_ipc=d.get("use_node_ipc", False),
tcp_port=d.get("tcp_port"),
auto_complete_selector=d.get("auto_complete_selector"),
enabled=d.get("enabled", False),
Expand Down Expand Up @@ -758,6 +757,7 @@ def from_config(cls, src_config: "ClientConfig", override: Dict[str, Any]) -> "C
priority_selector=_read_priority_selector(override) or src_config.priority_selector,
schemes=override.get("schemes", src_config.schemes),
command=override.get("command", src_config.command),
use_node_ipc=override.get("use_node_ipc", src_config.use_node_ipc),
tcp_port=override.get("tcp_port", src_config.tcp_port),
auto_complete_selector=override.get("auto_complete_selector", src_config.auto_complete_selector),
enabled=override.get("enabled", src_config.enabled),
Expand Down Expand Up @@ -803,8 +803,8 @@ def resolve_transport_config(self, variables: Dict[str, str]) -> TransportConfig
else:
env[key] = sublime.expand_variables(value, variables)
node_ipc = None
if '--node-ipc' in command:
node_ipc = NodeIpc(*multiprocessing.Pipe())
if self.use_node_ipc:
node_ipc = NodeIpcPipe(*multiprocessing.Pipe())
env["NODE_CHANNEL_FD"] = str(node_ipc.child_conn.fileno())
return TransportConfig(self.name, command, tcp_port, env, listener_socket, node_ipc)

Expand Down
11 changes: 11 additions & 0 deletions sublime-package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@
},
"markdownDescription": "The command to start the language server."
},
"ClientUseNodeIpc": {
"type": "boolean",
"default": false,
"markdownDescription": "Communicate with the language server over Node.js IPC. This lets the server print to stdout without disrupting the LSP communication. It's non-standard, but is used by VSCode. The command must be adjusted accordingly, e.g. `--stdio` must be replaced with `--node-ipc` in case of vscode-eslint. `tcp_port` is ignored if this is enabled."
},
"ClientEnabled": {
"type": "boolean",
"default": false,
Expand Down Expand Up @@ -156,6 +161,9 @@
"command": {
"$ref": "sublime://settings/LSP#/definitions/ClientCommand"
},
"use_node_ipc": {
"$ref": "sublime://settings/LSP#/definitions/ClientUseNodeIpc"
},
"enabled": {
"$ref": "sublime://settings/LSP#/definitions/ClientEnabled"
},
Expand Down Expand Up @@ -555,6 +563,9 @@
"command": {
"$ref": "sublime://settings/LSP#/definitions/ClientCommand"
},
"use_node_ipc": {
"$ref": "sublime://settings/LSP#/definitions/ClientUseNodeIpc"
},
"enabled": {
"$ref": "sublime://settings/LSP#/definitions/ClientEnabled"
},
Expand Down
6 changes: 3 additions & 3 deletions tests/test_protocol.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from LSP.plugin.core.protocol import Point, Position, Range, RangeLsp, Request, Notification
from LSP.plugin.core.transports import JsonRpcProcessor
from LSP.plugin.core.transports import encode_payload, decode_payload
import unittest


Expand Down Expand Up @@ -129,9 +129,9 @@ def test_extend(self) -> None:

class EncodingTests(unittest.TestCase):
def test_encode(self) -> None:
encoded = JsonRpcProcessor._encode({"text": "😃"})
encoded = encode_payload({"text": "😃"})
self.assertEqual(encoded, b'{"text":"\xF0\x9F\x98\x83"}')
decoded = JsonRpcProcessor._decode(encoded)
decoded = decode_payload(encoded)
self.assertEqual(decoded, {"text": "😃"})


Expand Down

0 comments on commit dd1fba1

Please sign in to comment.