Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve SSL performance by calling SSLProtocol methods directly, not through the python's vectorcall interface #626

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions uvloop/handles/stream.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ cdef class UVStream(UVBaseTransport):
Py_buffer _read_pybuf
bint _read_pybuf_acquired

cpdef write(self, object buf)

# All "inline" methods are final

cdef inline _init(self, Loop loop, object protocol, Server server,
Expand Down
41 changes: 33 additions & 8 deletions uvloop/handles/stream.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ cdef class UVStream(UVBaseTransport):

UVBaseTransport._set_protocol(self, protocol)

if (hasattr(protocol, 'get_buffer') and
if isinstance(protocol, SSLProtocol):
self.__buffered = 1
elif (hasattr(protocol, 'get_buffer') and
not isinstance(protocol, aio_Protocol)):
try:
self._protocol_get_buffer = protocol.get_buffer
Expand Down Expand Up @@ -674,7 +676,7 @@ cdef class UVStream(UVBaseTransport):
self.__reading,
id(self))

def write(self, object buf):
cpdef write(self, object buf):
self._ensure_alive()

if self._eof:
Expand Down Expand Up @@ -924,9 +926,24 @@ cdef void __uv_stream_buffered_alloc(
"UVStream alloc buffer callback") == 0:
return

cdef UVStream sc = <UVStream>stream.data

# Fast pass for our own SSLProtocol
# avoid python calls, memoryviews, context enter/exit, etc
if isinstance(sc._protocol, SSLProtocol):
try:
(<SSLProtocol>sc._protocol).get_buffer_impl(
suggested_size, &uvbuf.base, &uvbuf.len)
return
except BaseException as exc:
# Can't call 'sc._fatal_error' or 'sc._close', libuv will SF.
# We'll do it later in __uv_stream_buffered_on_read when we
# receive UV_ENOBUFS.
uvbuf.len = 0
uvbuf.base = NULL
return

cdef:
UVStream sc = <UVStream>stream.data
Loop loop = sc._loop
Py_buffer* pybuf = &sc._read_pybuf
int got_buf = 0

Expand Down Expand Up @@ -987,7 +1004,7 @@ cdef void __uv_stream_buffered_on_read(
return

try:
if nread > 0 and not sc._read_pybuf_acquired:
if nread > 0 and not isinstance(sc._protocol, SSLProtocol) and not sc._read_pybuf_acquired:
# From libuv docs:
# nread is > 0 if there is data available or < 0 on error. When
# we’ve reached EOF, nread will be set to UV_EOF. When
Expand All @@ -1008,12 +1025,20 @@ cdef void __uv_stream_buffered_on_read(
if UVLOOP_DEBUG:
loop._debug_stream_read_cb_total += 1

run_in_context1(sc.context, sc._protocol_buffer_updated, nread)
if isinstance(sc._protocol, SSLProtocol):
Context_Enter(sc.context)
try:
(<SSLProtocol>sc._protocol).buffer_updated_impl(nread)
finally:
Context_Exit(sc.context)
else:
run_in_context1(sc.context, sc._protocol_buffer_updated, nread)
except BaseException as exc:
if UVLOOP_DEBUG:
loop._debug_stream_read_cb_errors_total += 1

sc._fatal_error(exc, False)
finally:
sc._read_pybuf_acquired = 0
PyBuffer_Release(pybuf)
if sc._read_pybuf_acquired:
sc._read_pybuf_acquired = 0
PyBuffer_Release(pybuf)
69 changes: 37 additions & 32 deletions uvloop/sslproto.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ cdef class SSLProtocol:
object _outgoing_read
char* _ssl_buffer
size_t _ssl_buffer_len
object _ssl_buffer_view
SSLProtocolState _state
size_t _conn_lost
AppProtocolState _app_state
Expand All @@ -84,55 +83,61 @@ cdef class SSLProtocol:
object _handshake_timeout_handle
object _shutdown_timeout_handle

cdef _set_app_protocol(self, app_protocol)
cdef _wakeup_waiter(self, exc=*)
cdef _get_extra_info(self, name, default=*)
cdef _set_state(self, SSLProtocolState new_state)
# Instead of doing python calls, c methods *_impl are called directly
# from stream.pyx

cdef inline get_buffer_impl(self, size_t n, char** buf, size_t* buf_size)
cdef inline buffer_updated_impl(self, size_t nbytes)

cdef inline _set_app_protocol(self, app_protocol)
cdef inline _wakeup_waiter(self, exc=*)
cdef inline _get_extra_info(self, name, default=*)
cdef inline _set_state(self, SSLProtocolState new_state)

# Handshake flow

cdef _start_handshake(self)
cdef _check_handshake_timeout(self)
cdef _do_handshake(self)
cdef _on_handshake_complete(self, handshake_exc)
cdef inline _start_handshake(self)
cdef inline _check_handshake_timeout(self)
cdef inline _do_handshake(self)
cdef inline _on_handshake_complete(self, handshake_exc)

# Shutdown flow

cdef _start_shutdown(self, object context=*)
cdef _check_shutdown_timeout(self)
cdef _do_read_into_void(self, object context)
cdef _do_flush(self, object context=*)
cdef _do_shutdown(self, object context=*)
cdef _on_shutdown_complete(self, shutdown_exc)
cdef _abort(self, exc)
cdef inline _start_shutdown(self, object context=*)
cdef inline _check_shutdown_timeout(self)
cdef inline _do_read_into_void(self, object context)
cdef inline _do_flush(self, object context=*)
cdef inline _do_shutdown(self, object context=*)
cdef inline _on_shutdown_complete(self, shutdown_exc)
cdef inline _abort(self, exc)

# Outgoing flow

cdef _write_appdata(self, list_of_data, object context)
cdef _do_write(self)
cdef _process_outgoing(self)
cdef inline _write_appdata(self, list_of_data, object context)
cdef inline _do_write(self)
cdef inline _process_outgoing(self)

# Incoming flow

cdef _do_read(self)
cdef _do_read__buffered(self)
cdef _do_read__copied(self)
cdef _call_eof_received(self, object context=*)
cdef inline _do_read(self)
cdef inline _do_read__buffered(self)
cdef inline _do_read__copied(self)
cdef inline _call_eof_received(self, object context=*)

# Flow control for writes from APP socket

cdef _control_app_writing(self, object context=*)
cdef size_t _get_write_buffer_size(self)
cdef _set_write_buffer_limits(self, high=*, low=*)
cdef inline _control_app_writing(self, object context=*)
cdef inline size_t _get_write_buffer_size(self)
cdef inline _set_write_buffer_limits(self, high=*, low=*)

# Flow control for reads to APP socket

cdef _pause_reading(self)
cdef _resume_reading(self, object context)
cdef inline _pause_reading(self)
cdef inline _resume_reading(self, object context)

# Flow control for reads from SSL socket

cdef _control_ssl_reading(self)
cdef _set_read_buffer_limits(self, high=*, low=*)
cdef size_t _get_read_buffer_size(self)
cdef _fatal_error(self, exc, message=*)
cdef inline _control_ssl_reading(self)
cdef inline _set_read_buffer_limits(self, high=*, low=*)
cdef inline size_t _get_read_buffer_size(self)
cdef inline _fatal_error(self, exc, message=*)
30 changes: 21 additions & 9 deletions uvloop/sslproto.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,8 @@ cdef class SSLProtocol:
self._ssl_buffer = <char*>PyMem_RawMalloc(self._ssl_buffer_len)
if not self._ssl_buffer:
raise MemoryError()
self._ssl_buffer_view = PyMemoryView_FromMemory(
self._ssl_buffer, self._ssl_buffer_len, PyBUF_WRITE)

def __dealloc__(self):
self._ssl_buffer_view = None
PyMem_RawFree(self._ssl_buffer)
self._ssl_buffer = NULL
self._ssl_buffer_len = 0
Expand Down Expand Up @@ -358,7 +355,7 @@ cdef class SSLProtocol:
self._handshake_timeout_handle.cancel()
self._handshake_timeout_handle = None

def get_buffer(self, n):
cdef get_buffer_impl(self, size_t n, char** buf, size_t* buf_size):
cdef size_t want = n
if want > SSL_READ_MAX_SIZE:
want = SSL_READ_MAX_SIZE
Expand All @@ -367,11 +364,11 @@ cdef class SSLProtocol:
if not self._ssl_buffer:
raise MemoryError()
self._ssl_buffer_len = want
self._ssl_buffer_view = PyMemoryView_FromMemory(
self._ssl_buffer, want, PyBUF_WRITE)
return self._ssl_buffer_view

def buffer_updated(self, nbytes):
buf[0] = self._ssl_buffer
buf_size[0] = self._ssl_buffer_len

cdef buffer_updated_impl(self, size_t nbytes):
self._incoming_write(PyMemoryView_FromMemory(
self._ssl_buffer, nbytes, PyBUF_WRITE))

Expand All @@ -387,6 +384,18 @@ cdef class SSLProtocol:
elif self._state == SHUTDOWN:
self._do_shutdown()

def get_buffer(self, size_t n):
# This pure python call is still used by some very peculiar test cases
cdef:
char* buf
size_t buf_size

self.get_buffer_impl(n, &buf, &buf_size)
return PyMemoryView_FromMemory(buf, buf_size, PyBUF_WRITE)

def buffer_updated(self, size_t nbytes):
self.buffer_updated_impl(nbytes)

def eof_received(self):
"""Called when the other end of the low-level stream
is half-closed.
Expand Down Expand Up @@ -696,7 +705,10 @@ cdef class SSLProtocol:
if not self._ssl_writing_paused:
data = self._outgoing_read()
if len(data):
self._transport.write(data)
if isinstance(self._transport, UVStream):
(<UVStream>self._transport).write(data)
else:
self._transport.write(data)

# Incoming flow

Expand Down
Loading