diff --git a/Sources/Heze/Socket/HezeSocketClient.swift b/Sources/Heze/Socket/HezeSocketClient.swift index a0d7a08..4a07c04 100644 --- a/Sources/Heze/Socket/HezeSocketClient.swift +++ b/Sources/Heze/Socket/HezeSocketClient.swift @@ -18,8 +18,8 @@ open class HezeSocketClient: HezeSocketHandler { return socket?.isConnected ?? false } - open override func receiveBytes(_ bytes: [UInt8]) -> HezeResponsable? { - return nil + open override func receiveBytes(_ bytes: [UInt8], completion: @escaping (HezeResponsable?) -> Void) { + completion(nil) } open func close() { diff --git a/Sources/Heze/Socket/HezeSocketHandler.swift b/Sources/Heze/Socket/HezeSocketHandler.swift index 1764d1f..64d8e60 100644 --- a/Sources/Heze/Socket/HezeSocketHandler.swift +++ b/Sources/Heze/Socket/HezeSocketHandler.swift @@ -14,6 +14,7 @@ open class HezeSocketHandler: HezeHandler, WebSocketSessionHandler { public var socket: WebSocket? = nil public var remoteHost: String? public var remotePort: UInt16? + public var queue = DispatchQueue(label: "heze.socket.handler", attributes: .concurrent) public var buffer = [UInt8]() open var socketProtocol: String? { @@ -72,35 +73,52 @@ open class HezeSocketHandler: HezeHandler, WebSocketSessionHandler { guard let self = self else { return } - guard let bytes = bytes else { + + switch opcodeType { + case .binary, .text, .continuation: + guard let bytes = bytes else { + socket.close() + self.closed() + return + } + self.queue.async { [weak self] in + guard let self = self else { + return + } + self.handleBytes(bytes, opcodeType: opcodeType, final: final, socket: socket) { res in + if let bytes = res?.toBytes() { + self.sendBytes(bytes) { } + } + } + } + case .ping: + self.ping() + case .pong: + self.pong() + case .close, .invalid: socket.close() self.closed() return } - if let bytes = self.handleBytes(bytes, opcodeType: opcodeType, final: final, socket: socket)?.toBytes() { - self.sendBytes(bytes) { - self.handleSession(request: req, socket: socket) - } - } else { - self.handleSession(request: req, socket: socket) - } + + self.handleSession(request: req, socket: socket) } } - open func handleBytes(_ bytes: [UInt8], opcodeType: WebSocket.OpcodeType, final: Bool, socket: WebSocket) -> HezeResponsable? { + open func handleBytes(_ bytes: [UInt8], opcodeType: WebSocket.OpcodeType, final: Bool, socket: WebSocket, completion: @escaping (HezeResponsable?) -> Void) { buffer.append(contentsOf: bytes) if final { defer { buffer.removeAll(keepingCapacity: false) } - return receiveBytes(buffer) + receiveBytes(buffer, completion: completion) } else { - return nil + completion(nil) } } - open func receiveBytes(_ bytes: [UInt8]) -> HezeResponsable? { - return nil + open func receiveBytes(_ bytes: [UInt8], completion: @escaping (HezeResponsable?) -> Void) { + completion(nil) } open func sendMessage(_ msg: String, completion: @escaping () -> Void) { @@ -118,4 +136,12 @@ open class HezeSocketHandler: HezeHandler, WebSocketSessionHandler { open func closed() { } + + open func ping() { + + } + + open func pong() { + + } } diff --git a/Sources/HezeDemo/Socket/RepeatSocketClient.swift b/Sources/HezeDemo/Socket/RepeatSocketClient.swift index 4ca41b7..0780014 100644 --- a/Sources/HezeDemo/Socket/RepeatSocketClient.swift +++ b/Sources/HezeDemo/Socket/RepeatSocketClient.swift @@ -12,8 +12,8 @@ var RepeatSocketClientStorage: HezeSocketHandler? class RepeatSocketClient: HezeSocketClient { - override func receiveBytes(_ bytes: [UInt8]) -> HezeResponsable? { - return bytes + open override func receiveBytes(_ bytes: [UInt8], completion: @escaping (HezeResponsable?) -> Void) { + completion(bytes) } override class func create(context: HezeContext) -> HezeSocketHandler? {