diff --git a/uvloop/handles/stream.pxd b/uvloop/handles/stream.pxd index 8ca87437..df2e3615 100644 --- a/uvloop/handles/stream.pxd +++ b/uvloop/handles/stream.pxd @@ -16,6 +16,8 @@ cdef class UVStream(UVBaseTransport): Py_buffer _read_pybuf bint _read_pybuf_acquired + cpdef write(self, object buf) + # All "inline" methods are final cdef inline _init(self, Loop loop, object protocol, Server server, diff --git a/uvloop/handles/stream.pyx b/uvloop/handles/stream.pyx index 9fbc5a51..30f83979 100644 --- a/uvloop/handles/stream.pyx +++ b/uvloop/handles/stream.pyx @@ -232,7 +232,9 @@ cdef class UVStream(UVBaseTransport): UVBaseTransport._set_protocol(self, protocol) - if (hasattr(protocol, 'get_buffer') and + if isinstance(protocol, SSLProtocol): + self.__buffered = 1 + elif (hasattr(protocol, 'get_buffer') and not isinstance(protocol, aio_Protocol)): try: self._protocol_get_buffer = protocol.get_buffer @@ -674,7 +676,7 @@ cdef class UVStream(UVBaseTransport): self.__reading, id(self)) - def write(self, object buf): + cpdef write(self, object buf): self._ensure_alive() if self._eof: @@ -924,9 +926,24 @@ cdef void __uv_stream_buffered_alloc( "UVStream alloc buffer callback") == 0: return + cdef UVStream sc = stream.data + + # Fast pass for our own SSLProtocol + # avoid python calls, memoryviews, context enter/exit, etc + if isinstance(sc._protocol, SSLProtocol): + try: + (sc._protocol).get_buffer_impl( + suggested_size, &uvbuf.base, &uvbuf.len) + return + except BaseException as exc: + # Can't call 'sc._fatal_error' or 'sc._close', libuv will SF. + # We'll do it later in __uv_stream_buffered_on_read when we + # receive UV_ENOBUFS. + uvbuf.len = 0 + uvbuf.base = NULL + return + cdef: - UVStream sc = stream.data - Loop loop = sc._loop Py_buffer* pybuf = &sc._read_pybuf int got_buf = 0 @@ -987,7 +1004,7 @@ cdef void __uv_stream_buffered_on_read( return try: - if nread > 0 and not sc._read_pybuf_acquired: + if nread > 0 and not isinstance(sc._protocol, SSLProtocol) and not sc._read_pybuf_acquired: # From libuv docs: # nread is > 0 if there is data available or < 0 on error. When # we’ve reached EOF, nread will be set to UV_EOF. When @@ -1008,12 +1025,20 @@ cdef void __uv_stream_buffered_on_read( if UVLOOP_DEBUG: loop._debug_stream_read_cb_total += 1 - run_in_context1(sc.context, sc._protocol_buffer_updated, nread) + if isinstance(sc._protocol, SSLProtocol): + Context_Enter(sc.context) + try: + (sc._protocol).buffer_updated_impl(nread) + finally: + Context_Exit(sc.context) + else: + run_in_context1(sc.context, sc._protocol_buffer_updated, nread) except BaseException as exc: if UVLOOP_DEBUG: loop._debug_stream_read_cb_errors_total += 1 sc._fatal_error(exc, False) finally: - sc._read_pybuf_acquired = 0 - PyBuffer_Release(pybuf) + if sc._read_pybuf_acquired: + sc._read_pybuf_acquired = 0 + PyBuffer_Release(pybuf) diff --git a/uvloop/sslproto.pxd b/uvloop/sslproto.pxd index 3da10f00..cf3205d1 100644 --- a/uvloop/sslproto.pxd +++ b/uvloop/sslproto.pxd @@ -59,7 +59,6 @@ cdef class SSLProtocol: object _outgoing_read char* _ssl_buffer size_t _ssl_buffer_len - object _ssl_buffer_view SSLProtocolState _state size_t _conn_lost AppProtocolState _app_state @@ -84,55 +83,61 @@ cdef class SSLProtocol: object _handshake_timeout_handle object _shutdown_timeout_handle - cdef _set_app_protocol(self, app_protocol) - cdef _wakeup_waiter(self, exc=*) - cdef _get_extra_info(self, name, default=*) - cdef _set_state(self, SSLProtocolState new_state) + # Instead of doing python calls, c methods *_impl are called directly + # from stream.pyx + + cdef inline get_buffer_impl(self, size_t n, char** buf, size_t* buf_size) + cdef inline buffer_updated_impl(self, size_t nbytes) + + cdef inline _set_app_protocol(self, app_protocol) + cdef inline _wakeup_waiter(self, exc=*) + cdef inline _get_extra_info(self, name, default=*) + cdef inline _set_state(self, SSLProtocolState new_state) # Handshake flow - cdef _start_handshake(self) - cdef _check_handshake_timeout(self) - cdef _do_handshake(self) - cdef _on_handshake_complete(self, handshake_exc) + cdef inline _start_handshake(self) + cdef inline _check_handshake_timeout(self) + cdef inline _do_handshake(self) + cdef inline _on_handshake_complete(self, handshake_exc) # Shutdown flow - cdef _start_shutdown(self, object context=*) - cdef _check_shutdown_timeout(self) - cdef _do_read_into_void(self, object context) - cdef _do_flush(self, object context=*) - cdef _do_shutdown(self, object context=*) - cdef _on_shutdown_complete(self, shutdown_exc) - cdef _abort(self, exc) + cdef inline _start_shutdown(self, object context=*) + cdef inline _check_shutdown_timeout(self) + cdef inline _do_read_into_void(self, object context) + cdef inline _do_flush(self, object context=*) + cdef inline _do_shutdown(self, object context=*) + cdef inline _on_shutdown_complete(self, shutdown_exc) + cdef inline _abort(self, exc) # Outgoing flow - cdef _write_appdata(self, list_of_data, object context) - cdef _do_write(self) - cdef _process_outgoing(self) + cdef inline _write_appdata(self, list_of_data, object context) + cdef inline _do_write(self) + cdef inline _process_outgoing(self) # Incoming flow - cdef _do_read(self) - cdef _do_read__buffered(self) - cdef _do_read__copied(self) - cdef _call_eof_received(self, object context=*) + cdef inline _do_read(self) + cdef inline _do_read__buffered(self) + cdef inline _do_read__copied(self) + cdef inline _call_eof_received(self, object context=*) # Flow control for writes from APP socket - cdef _control_app_writing(self, object context=*) - cdef size_t _get_write_buffer_size(self) - cdef _set_write_buffer_limits(self, high=*, low=*) + cdef inline _control_app_writing(self, object context=*) + cdef inline size_t _get_write_buffer_size(self) + cdef inline _set_write_buffer_limits(self, high=*, low=*) # Flow control for reads to APP socket - cdef _pause_reading(self) - cdef _resume_reading(self, object context) + cdef inline _pause_reading(self) + cdef inline _resume_reading(self, object context) # Flow control for reads from SSL socket - cdef _control_ssl_reading(self) - cdef _set_read_buffer_limits(self, high=*, low=*) - cdef size_t _get_read_buffer_size(self) - cdef _fatal_error(self, exc, message=*) + cdef inline _control_ssl_reading(self) + cdef inline _set_read_buffer_limits(self, high=*, low=*) + cdef inline size_t _get_read_buffer_size(self) + cdef inline _fatal_error(self, exc, message=*) diff --git a/uvloop/sslproto.pyx b/uvloop/sslproto.pyx index 42bb7644..5442b5d0 100644 --- a/uvloop/sslproto.pyx +++ b/uvloop/sslproto.pyx @@ -204,11 +204,8 @@ cdef class SSLProtocol: self._ssl_buffer = PyMem_RawMalloc(self._ssl_buffer_len) if not self._ssl_buffer: raise MemoryError() - self._ssl_buffer_view = PyMemoryView_FromMemory( - self._ssl_buffer, self._ssl_buffer_len, PyBUF_WRITE) def __dealloc__(self): - self._ssl_buffer_view = None PyMem_RawFree(self._ssl_buffer) self._ssl_buffer = NULL self._ssl_buffer_len = 0 @@ -358,7 +355,7 @@ cdef class SSLProtocol: self._handshake_timeout_handle.cancel() self._handshake_timeout_handle = None - def get_buffer(self, n): + cdef get_buffer_impl(self, size_t n, char** buf, size_t* buf_size): cdef size_t want = n if want > SSL_READ_MAX_SIZE: want = SSL_READ_MAX_SIZE @@ -367,11 +364,11 @@ cdef class SSLProtocol: if not self._ssl_buffer: raise MemoryError() self._ssl_buffer_len = want - self._ssl_buffer_view = PyMemoryView_FromMemory( - self._ssl_buffer, want, PyBUF_WRITE) - return self._ssl_buffer_view - def buffer_updated(self, nbytes): + buf[0] = self._ssl_buffer + buf_size[0] = self._ssl_buffer_len + + cdef buffer_updated_impl(self, size_t nbytes): self._incoming_write(PyMemoryView_FromMemory( self._ssl_buffer, nbytes, PyBUF_WRITE)) @@ -387,6 +384,18 @@ cdef class SSLProtocol: elif self._state == SHUTDOWN: self._do_shutdown() + def get_buffer(self, size_t n): + # This pure python call is still used by some very peculiar test cases + cdef: + char* buf + size_t buf_size + + self.get_buffer_impl(n, &buf, &buf_size) + return PyMemoryView_FromMemory(buf, buf_size, PyBUF_WRITE) + + def buffer_updated(self, size_t nbytes): + self.buffer_updated_impl(nbytes) + def eof_received(self): """Called when the other end of the low-level stream is half-closed. @@ -696,7 +705,10 @@ cdef class SSLProtocol: if not self._ssl_writing_paused: data = self._outgoing_read() if len(data): - self._transport.write(data) + if isinstance(self._transport, UVStream): + (self._transport).write(data) + else: + self._transport.write(data) # Incoming flow