diff --git a/Source/Shared/Core/Network/NetConnectionTCP.cpp b/Source/Shared/Core/Network/NetConnectionTCP.cpp index 837a94c3..bb3e0cfc 100644 --- a/Source/Shared/Core/Network/NetConnectionTCP.cpp +++ b/Source/Shared/Core/Network/NetConnectionTCP.cpp @@ -23,7 +23,7 @@ namespace { // Maximum backlog of data in a packet streams send queue. Sending // packets beyond this will result in disconnect. - static inline constexpr size_t k_max_send_buffer_size = 256 * 1024; + static inline constexpr size_t k_max_send_buffer_size = 512 * 1024; }; @@ -111,7 +111,7 @@ bool NetConnectionTCP::Listen(int Port) return false; } - if (listen(Socket, 64) < 0) + if (listen(Socket, 1024) < 0) { ErrorS(GetName().c_str(), "Failed to listen on socket on port %i.", Port); return false; diff --git a/Source/Shared/Core/Network/NetConnectionUDP.cpp b/Source/Shared/Core/Network/NetConnectionUDP.cpp index 97ee5128..2b5db85d 100644 --- a/Source/Shared/Core/Network/NetConnectionUDP.cpp +++ b/Source/Shared/Core/Network/NetConnectionUDP.cpp @@ -35,8 +35,9 @@ NetConnectionUDP::NetConnectionUDP(const std::string& InName) RecieveBuffer.resize(64 * 1024); } -NetConnectionUDP::NetConnectionUDP(SocketType ParentSocket, sockaddr_in InDestination, const std::string& InName, const NetIPAddress& InAddress) - : Destination(InDestination) +NetConnectionUDP::NetConnectionUDP(NetConnectionUDP* InParent, SocketType ParentSocket, sockaddr_in InDestination, const std::string& InName, const NetIPAddress& InAddress) + : Parent(InParent) + , Destination(InDestination) , Name(InName) , bChild(true) , Socket(ParentSocket) @@ -75,29 +76,6 @@ bool NetConnectionUDP::Listen(int Port) return false; } - // Set socket to non-blocking mode. -#if defined(_WIN32) - unsigned long mode = 1; - if (int result = ioctlsocket(Socket, FIONBIO, &mode); result != 0) - { - ErrorS(GetName().c_str(), "Failed to set socket to non blocking with error 0x%08x", result); - return false; - } -#else - int flags; - if (flags = fcntl(Socket, F_GETFL, 0); flags == -1) - { - ErrorS(GetName().c_str(), "Failed to get socket flags."); - return false; - } - flags = flags | O_NONBLOCK; - if (int result = fcntl(Socket, F_SETFL, flags); result != 0) - { - ErrorS(GetName().c_str(), "Failed to set socket to non blocking with error 0x%08x", result); - return false; - } -#endif - // Boost buffer sizes int BufferSize = 16 * 1024 * 1024; if (setsockopt(Socket, SOL_SOCKET, SO_RCVBUF, (const char*)&BufferSize, sizeof(BufferSize))) @@ -123,6 +101,15 @@ bool NetConnectionUDP::Listen(int Port) } bListening = true; + bShuttingDownThreads = false; + bErrorOnThreads = false; + + RecieveThread = std::make_unique([&]() { + RecieveThreadEntry(); + }); + SendThread = std::make_unique([&]() { + SendThreadEntry(); + }); return true; } @@ -172,29 +159,6 @@ bool NetConnectionUDP::Connect(std::string Hostname, int Port, bool ForceLastIpE return false; } - // Set socket to non-blocking mode. -#if defined(_WIN32) - unsigned long mode = 1; - if (int result = ioctlsocket(Socket, FIONBIO, &mode); result != 0) - { - ErrorS(GetName().c_str(), "Failed to set socket to non blocking with error 0x%08x", result); - return false; - } -#else - int flags; - if (flags = fcntl(Socket, F_GETFL, 0); flags == -1) - { - ErrorS(GetName().c_str(), "Failed to get socket flags."); - return false; - } - flags = flags | O_NONBLOCK; - if (int result = fcntl(Socket, F_SETFL, flags); result != 0) - { - ErrorS(GetName().c_str(), "Failed to set socket to non blocking with error 0x%08x", result); - return false; - } -#endif - // Boost buffer sizes int BufferSize = 16 * 1024 * 1024; if (setsockopt(Socket, SOL_SOCKET, SO_RCVBUF, (const char*)&BufferSize, sizeof(BufferSize))) @@ -223,6 +187,16 @@ bool NetConnectionUDP::Connect(std::string Hostname, int Port, bool ForceLastIpE Destination.sin_family = AF_INET; memset(Destination.sin_zero, 0, sizeof(Destination.sin_zero)); inet_pton(AF_INET, Hostname.c_str(), &(Destination.sin_addr)); + + bShuttingDownThreads = false; + bErrorOnThreads = false; + + RecieveThread = std::make_unique([&]() { + RecieveThreadEntry(); + }); + SendThread = std::make_unique([&]() { + SendThreadEntry(); + }); return true; } @@ -272,45 +246,24 @@ bool NetConnectionUDP::Recieve(std::vector& Buffer, int Offset, int Cou bool NetConnectionUDP::Send(const std::vector& Buffer, int Offset, int Count) { - int Result = sendto(Socket, (char*)Buffer.data() + Offset, Count, 0, (sockaddr*)&Destination, sizeof(sockaddr_in)); - if (Result < 0) + NetConnectionUDP* EnqueueConnection = this; + if (bChild) { -#if defined(_WIN32) - int error = WSAGetLastError(); -#else - int error = errno; -#endif + EnqueueConnection = Parent; + } - // Blocking is fine, just return. -#if defined(_WIN32) - if (error == WSAEWOULDBLOCK) -#else - if (error == EWOULDBLOCK || error == EAGAIN) -#endif - { - return false; - } + std::unique_ptr Pending = std::make_unique(); + Pending->Data.resize(Count); + memcpy(Pending->Data.data(), Buffer.data() + Offset, Count); + Pending->SourceAddress = Destination; + Pending->ProcessTime = 0.0f; - ErrorS(GetName().c_str(), "Failed to send with error 0x%08x.", error); - return false; - } - else if (Result != Count) { - ErrorS(GetName().c_str(), "Failed to send packet in its entirety, wanted to send %i but sent %i. Datagram larger than MTU?", Count, Result); - return false; + std::unique_lock lock(EnqueueConnection->SendQueueMutex); + EnqueueConnection->SendQueue.push(std::move(Pending)); + EnqueueConnection->SendQueueCvar.notify_all(); } - - /* - LogS(GetName().c_str(), ">> %i to %i.%i.%i.%i:%i", Result, - Destination.sin_addr.S_un.S_un_b.s_b1, - Destination.sin_addr.S_un.S_un_b.s_b2, - Destination.sin_addr.S_un.S_un_b.s_b3, - Destination.sin_addr.S_un.S_un_b.s_b4, - ntohs(Destination.sin_port)); - */ - - Debug::UdpBytesSent.Add(Count); - + return true; } @@ -323,6 +276,18 @@ bool NetConnectionUDP::Disconnect() if (!bChild) { + bShuttingDownThreads = true; + if (RecieveThread) + { + RecieveThread->join(); + RecieveThread = nullptr; + } + if (SendThread) + { + SendThread->join(); + SendThread = nullptr; + } + #ifdef _WIN32 closesocket(Socket); #else @@ -393,7 +358,7 @@ void NetConnectionUDP::ProcessPacket(const PendingPacket& Packet) ); #endif - std::shared_ptr NewConnection = std::make_shared(Socket, Packet.SourceAddress, ClientName.data(), NetClientAddress); + std::shared_ptr NewConnection = std::make_shared(this, Socket, Packet.SourceAddress, ClientName.data(), NetClientAddress); NewConnection->RecieveQueue.push_back(Packet.Data); NewConnections.push_back(NewConnection); ChildConnections.push_back(NewConnection); @@ -405,100 +370,185 @@ void NetConnectionUDP::ProcessPacket(const PendingPacket& Packet) } } -bool NetConnectionUDP::Pump() +void NetConnectionUDP::RecieveThreadEntry() { - if (Socket == INVALID_SOCKET_VALUE) + while (!bShuttingDownThreads) { - return false; - } - - if (!bChild) - { - while (true) - { - // Recieve any pending datagrams and route to the appropriate child recieve queue. + // Recieve any pending datagrams and route to the appropriate child recieve queue. + socklen_t SourceAddressSize = sizeof(struct sockaddr); + sockaddr_in SourceAddress = { 0 }; - socklen_t SourceAddressSize = sizeof(struct sockaddr); - sockaddr_in SourceAddress = { 0 }; + int Flags = 0; + int Result = recvfrom(Socket, (char*)RecieveBuffer.data(), (int)RecieveBuffer.size(), Flags, (sockaddr*)&SourceAddress, &SourceAddressSize); + if (Result < 0) + { +#if defined(_WIN32) + int error = WSAGetLastError(); +#else + int error = errno; +#endif - int Flags = 0; -#ifdef __unix__ - Flags |= MSG_DONTWAIT; + // Blocking is fine, just return. +#if defined(_WIN32) + if (error == WSAEWOULDBLOCK) +#else + if (error == EWOULDBLOCK || error == EAGAIN) #endif + { + continue; + } - int Result = recvfrom(Socket, (char*)RecieveBuffer.data(), (int)RecieveBuffer.size(), Flags, (sockaddr*)&SourceAddress, &SourceAddressSize); + ErrorS(GetName().c_str(), "Failed to recieve with error 0x%08x.", error); + // We ignore the error and keep continuing to try and recieve. + } + else if (Result > 0) + { + std::vector Packet(RecieveBuffer.data(), RecieveBuffer.data() + Result); + + bool bDropPacket = false; + + if constexpr (k_emulate_dropped_backs) + { + if (FRandRange(0.0f, 1.0f) <= k_drop_packet_probability) + { + bDropPacket = true; + } + } + + if (!bDropPacket) + { + double Latency = k_latency_minimum + FRandRange(-k_latency_variance, k_latency_variance); + + std::unique_ptr Pending = std::make_unique(); + Pending->Data = Packet; + Pending->SourceAddress = SourceAddress; + Pending->ProcessTime = GetSeconds() + (Latency / 1000.0f); + + std::unique_lock lock(PendingPacketsMutex); + PendingPackets.push(std::move(Pending)); + } + + //Log("<< %zi bytes", (size_t)Result); + + Debug::UdpBytesRecieved.Add(Result); + } + } +} + +void NetConnectionUDP::SendThreadEntry() +{ + while (!bShuttingDownThreads) + { + std::unique_ptr SendPacket; + + // Grab next packet to send. + { + std::unique_lock lock(SendQueueMutex); + while (true) + { + if (!SendQueue.empty()) + { + SendPacket = std::move(SendQueue.front()); + SendQueue.pop(); + break; + } + else + { + SendQueueCvar.wait(lock); + } + } + } + + // Send the packet! + while (true) + { + int Result = sendto(Socket, (char*)SendPacket->Data.data(), SendPacket->Data.size(), 0, (sockaddr*)&SendPacket->SourceAddress, sizeof(sockaddr_in)); if (Result < 0) { - #if defined(_WIN32) + #if defined(_WIN32) int error = WSAGetLastError(); - #else + #else int error = errno; - #endif + #endif // Blocking is fine, just return. - #if defined(_WIN32) + #if defined(_WIN32) if (error == WSAEWOULDBLOCK) - #else + #else if (error == EWOULDBLOCK || error == EAGAIN) - #endif + #endif { - break; + continue; } - ErrorS(GetName().c_str(), "Failed to recieve with error 0x%08x.", error); - return false; + ErrorS(GetName().c_str(), "Failed to send with error 0x%08x.", error); + bErrorOnThreads = true; + return; } - else if (Result > 0) + else if (Result != SendPacket->Data.size()) { - std::vector Packet(RecieveBuffer.data(), RecieveBuffer.data() + Result); - - bool bDropPacket = false; - - if constexpr (k_emulate_dropped_backs) - { - if (FRandRange(0.0f, 1.0f) <= k_drop_packet_probability) - { - bDropPacket = true; - } - } + ErrorS(GetName().c_str(), "Failed to send packet in its entirety, wanted to send %i but sent %i. Datagram larger than MTU?", SendPacket->Data.size(), Result); + bErrorOnThreads = true; + return; + } - if (!bDropPacket) - { - double Latency = k_latency_minimum + FRandRange(-k_latency_variance, k_latency_variance); - - PendingPacket Pending; - Pending.Data = Packet; - Pending.SourceAddress = SourceAddress; - Pending.ProcessTime = GetSeconds() + (Latency / 1000.0f); - - if constexpr (k_emulate_latency) - { - PendingPackets.push_back(Pending); - } - else - { - ProcessPacket(Pending); - } - } + Debug::UdpBytesSent.Add(SendPacket->Data.size()); + break; + } - Debug::UdpBytesRecieved.Add(Result); + //Log(">> %zi bytes", SendPacket->Data.size()); + } +} - //LogS(GetName().c_str(), "<< %i", Result); - } +bool NetConnectionUDP::Pump() +{ + if (Socket == INVALID_SOCKET_VALUE) + { + return false; + } + + if (!bChild) + { + if (bErrorOnThreads) + { + return false; } } // Recieve pending packets. - for (auto iter = PendingPackets.begin(); iter != PendingPackets.end(); /* empty */) { - if (GetSeconds() > iter->ProcessTime) + std::vector> PacketsToProcess; + + // Grab all the packets in the recieve queue that currently need processing. + // Keep this code slim so we don't hold the mutex longer than neccessary (as we don't currently do this lock-free) { - ProcessPacket(*iter); - iter = PendingPackets.erase(iter); + std::unique_lock lock(PendingPacketsMutex); + while (!PendingPackets.empty()) + { + bool Process = true; + PendingPacket* NextPacket = PendingPackets.front().get(); + + if constexpr (k_emulate_latency) + { + Process = (GetSeconds() >= NextPacket->ProcessTime); + } + + if (Process) + { + PacketsToProcess.push_back(std::move(PendingPackets.front())); + PendingPackets.pop(); + } + else + { + break; + } + } } - else + + // Process away. + for (auto& packet : PacketsToProcess) { - iter++; + ProcessPacket(*packet); } } diff --git a/Source/Shared/Core/Network/NetConnectionUDP.h b/Source/Shared/Core/Network/NetConnectionUDP.h index 06e85583..1a6caf8f 100644 --- a/Source/Shared/Core/Network/NetConnectionUDP.h +++ b/Source/Shared/Core/Network/NetConnectionUDP.h @@ -27,6 +27,10 @@ #endif #include +#include +#include +#include +#include class NetConnectionUDP : public NetConnection @@ -43,7 +47,7 @@ class NetConnectionUDP #endif public: - NetConnectionUDP(SocketType ParentSocket, sockaddr_in DestinationIP, const std::string& InName, const NetIPAddress& InAddress); + NetConnectionUDP(NetConnectionUDP* Parent, SocketType ParentSocket, sockaddr_in DestinationIP, const std::string& InName, const NetIPAddress& InAddress); NetConnectionUDP(const std::string& InName); virtual ~NetConnectionUDP(); @@ -77,11 +81,15 @@ class NetConnectionUDP void ProcessPacket(const PendingPacket& Packet); + void RecieveThreadEntry(); + void SendThreadEntry(); + private: std::string Name; NetIPAddress IPAddress; + NetConnectionUDP* Parent = nullptr; SocketType Socket = INVALID_SOCKET_VALUE; bool bListening = false; @@ -89,12 +97,22 @@ class NetConnectionUDP sockaddr_in Destination = {}; - std::vector PendingPackets; - std::vector RecieveBuffer; std::vector> RecieveQueue; std::vector> NewConnections; std::vector> ChildConnections; + std::mutex PendingPacketsMutex; + std::queue> PendingPackets; + + std::queue> SendQueue; + std::mutex SendQueueMutex; + std::condition_variable SendQueueCvar; + + std::unique_ptr RecieveThread; + std::unique_ptr SendThread; + bool bShuttingDownThreads = false; + bool bErrorOnThreads = false; + }; \ No newline at end of file