Skip to content

Commit

Permalink
Merge pull request #857 from Expensify/master
Browse files Browse the repository at this point in the history
Update expensify_prod branch
  • Loading branch information
justinpersaud authored Aug 21, 2020
2 parents 5464373 + 9e1861a commit 1b81309
Show file tree
Hide file tree
Showing 17 changed files with 153 additions and 74 deletions.
6 changes: 5 additions & 1 deletion BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,8 @@ void BedrockServer::sync(const SData& args,
// This is sort of the "default" case after checking if this command was complete above. If so,
// we'll fall through to calling processCommand below.
} else if (result == BedrockCore::RESULT::ABANDONED_FOR_CHECKPOINT) {
// Finished with this.
server._syncThreadCommitMutex.unlock();
SINFO("[checkpoint] Re-queuing abandoned command (from peek) in sync thread");
server._commandQueue.push(move(command));
break;
Expand Down Expand Up @@ -684,6 +686,8 @@ void BedrockServer::sync(const SData& args,
server._reply(command);
}
} else if (result == BedrockCore::RESULT::ABANDONED_FOR_CHECKPOINT) {
// Finished with this.
server._syncThreadCommitMutex.unlock();
SINFO("[checkpoint] Re-queuing abandoned command (from process) in sync thread");
server._commandQueue.push(move(command));
break;
Expand Down Expand Up @@ -1618,7 +1622,7 @@ void BedrockServer::postPoll(fd_map& fdm, uint64_t& nextActivity) {
} else {
// Otherwise, handle any default request.
int requestSize = request.deserialize(s->recvBuffer);
SConsumeFront(s->recvBuffer, requestSize);
s->recvBuffer.consumeFront(requestSize);
deserializationAttempts++;
}

Expand Down
2 changes: 1 addition & 1 deletion libstuff/SData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ int SData::deserialize(const string& fromString) {
return (SParseHTTP(fromString, methodLine, nameValueMap, content));
}

int SData::deserialize(const char* buffer, int length) {
int SData::deserialize(const char* buffer, size_t length) {
return (SParseHTTP(buffer, length, methodLine, nameValueMap, content));
}

Expand Down
9 changes: 6 additions & 3 deletions libstuff/SData.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#pragma once

#include "libstuff.h"

// --------------------------------------------------------------------------
// A very simple HTTP-like structure consisting of a method line, a table,
// and a content body.
Expand Down Expand Up @@ -94,7 +92,12 @@ struct SData {
int deserialize(const string& rhs);

// Deserializes from a buffer
int deserialize(const char* buffer, int length);
int deserialize(const char* buffer, size_t length);

// Deserializes from an SFastBuffer.
int deserialize(const SFastBuffer& buf) {
return deserialize(buf.c_str(), buf.size());
}

// Initializes a new SData from a string. If there is no content provided,
// then use whatever data remains in the string as the content
Expand Down
72 changes: 72 additions & 0 deletions libstuff/SFastBuffer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#include <libstuff/libstuff.h>

SFastBuffer::SFastBuffer() : front(0) {
}

SFastBuffer::SFastBuffer(const string& str) : front(0), data(str) {
}

bool SFastBuffer::empty() const {
return size() == 0;
}

size_t SFastBuffer::size() const {
return data.size() - front;
}

const char* SFastBuffer::c_str() const {
return data.data() + front;
}

void SFastBuffer::clear() {
front = 0;
data.clear();
}

void SFastBuffer::consumeFront(size_t bytes) {
front += bytes;

// If we're all caught up, reset.
if (front == data.size()) {
clear();
}
}

void SFastBuffer::append(const char* buffer, size_t bytes) {
// When will we condense everything to the front of the buffer?
// When:
// 1. We're not already at the front of the buffer (this implies there's data in the buffer).
// 2. We'd have to do a realloc anyway because our buffer's not big enough for the new string (including the
// existing consumed buffer).
if (front && (data.capacity() - data.size() < bytes)) {
memmove(&data[0], data.data() + front, size());
data.resize(size());
front = 0;

// If the capacity is more than 4x the size we need, let's give some memory back.
if (data.capacity() > (data.size() + bytes) * 4) {
data.shrink_to_fit();
}
}

// After the resize, we may or may not need to actually reallocate. We can append now and let the string
// implementation decide if it needs more room.
data.append(buffer, bytes);
}

SFastBuffer& SFastBuffer::operator+=(const string& rhs) {
append(rhs.c_str(), rhs.size());
return *this;
}

SFastBuffer& SFastBuffer::operator=(const string& rhs) {
front = 0;
data = rhs;
return *this;
}

ostream& operator<<(ostream& os, const SFastBuffer& buf)
{
os << buf.c_str();
return os;
}
19 changes: 19 additions & 0 deletions libstuff/SFastBuffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#pragma once
class SFastBuffer {
public:
SFastBuffer();
SFastBuffer(const string& str);
bool empty() const;
size_t size() const;
const char* c_str() const;
void clear();
void consumeFront(size_t bytes);
void append(const char* buffer, size_t bytes);
SFastBuffer& operator+=(const string& rhs);
SFastBuffer& operator=(const string& rhs);

private:
size_t front;
string data;
};
ostream& operator<<(ostream& os, const SFastBuffer& buf);
2 changes: 1 addition & 1 deletion libstuff/SHTTPSManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ void SStandaloneHTTPSManager::postPoll(fd_map& fdm, uint64_t& nextActivity, list
bool specificallyTimedOut = timeoutIt != transactionTimeouts.end() && timeoutIt->second < now;
if (size) {
// Consume how much we read.
SConsumeFront(active->s->recvBuffer, size);
active->s->recvBuffer.consumeFront(size);

// 200OK or any content?
active->finished = now;
Expand Down
8 changes: 4 additions & 4 deletions libstuff/SSSLState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,13 @@ void SSSLClose(SSSLState* ssl) {
}

// --------------------------------------------------------------------------
int SSSLSend(SSSLState* ssl, const string& buffer) {
int SSSLSend(SSSLState* ssl, const SFastBuffer& buffer) {
// Unwind the buffer
return SSSLSend(ssl, buffer.c_str(), (int)buffer.size());
}

// --------------------------------------------------------------------------
bool SSSLSendConsume(SSSLState* ssl, string& sendBuffer) {
bool SSSLSendConsume(SSSLState* ssl, SFastBuffer& sendBuffer) {
// Send as much as we can and return whether the socket is still alive
if (sendBuffer.empty()) {
return true;
Expand All @@ -161,7 +161,7 @@ bool SSSLSendConsume(SSSLState* ssl, string& sendBuffer) {
// Nothing to send, assume we're alive
int numSent = SSSLSend(ssl, sendBuffer);
if (numSent > 0) {
SConsumeFront(sendBuffer, numSent);
sendBuffer.consumeFront(numSent);
}

// Done!
Expand All @@ -184,7 +184,7 @@ bool SSSLSendAll(SSSLState* ssl, const string& buffer) {
}

// --------------------------------------------------------------------------
bool SSSLRecvAppend(SSSLState* ssl, string& recvBuffer) {
bool SSSLRecvAppend(SSSLState* ssl, SFastBuffer& recvBuffer) {
// Keep trying to receive as long as we can
SASSERT(ssl);
char buffer[1024 * 16];
Expand Down
7 changes: 4 additions & 3 deletions libstuff/SSSLState.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include <libstuff/libstuff.h>
#include <mbedtls/ssl.h>
#include <mbedtls/entropy.h>
#include <mbedtls/ctr_drbg.h>
Expand All @@ -18,11 +19,11 @@ struct SSSLState {
// SSL helpers
extern SSSLState* SSSLOpen(int s, SX509* x509);
extern int SSSLSend(SSSLState* ssl, const char* buffer, int length);
extern int SSSLSend(SSSLState* ssl, const string& buffer);
extern bool SSSLSendConsume(SSSLState* ssl, string& sendBuffer);
extern int SSSLSend(SSSLState* ssl, const SFastBuffer& buffer);
extern bool SSSLSendConsume(SSSLState* ssl, SFastBuffer& sendBuffer);
extern bool SSSLSendAll(SSSLState* ssl, const string& buffer);
extern int SSSLRecv(SSSLState* ssl, char* buffer, int length);
extern bool SSSLRecvAppend(SSSLState* ssl, string& recvBuffer);
extern bool SSSLRecvAppend(SSSLState* ssl, SFastBuffer& recvBuffer);
extern string SSSLGetState(SSSLState* ssl);
extern void SSSLShutdown(SSSLState* ssl);
extern void SSSLClose(SSSLState* ssl);
2 changes: 1 addition & 1 deletion libstuff/STCPManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ bool STCPManager::Socket::sendBufferEmpty() {

string STCPManager::Socket::sendBufferCopy() {
lock_guard<decltype(sendRecvMutex)> lock(sendRecvMutex);
return sendBuffer;
return string(sendBuffer.c_str(), sendBuffer.size());
}

void STCPManager::Socket::setSendBuffer(const string& buffer) {
Expand Down
4 changes: 2 additions & 2 deletions libstuff/STCPManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ struct STCPManager {
// Attributes
int s;
sockaddr_in addr;
string recvBuffer;
SFastBuffer recvBuffer;
atomic<State> state;
bool connectFailure;
uint64_t openTime;
Expand Down Expand Up @@ -41,7 +41,7 @@ struct STCPManager {
// This is private because it's used by our synchronized send() functions. This requires it to only
// be accessed through the (also synchronized) wrapper functions above.
// NOTE: Currently there's no synchronization around `recvBuffer`. It can only be accessed by one thread.
string sendBuffer;
SFastBuffer sendBuffer;

// Each socket owns it's own SX509 object to avoid thread-safety issues reading/writing the same certificate in
// the underlying ssl code. Once assigned, the socket owns this object for it's lifetime and will delete it
Expand Down
12 changes: 8 additions & 4 deletions libstuff/STCPNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
#define SLOGPREFIX "{" << name << "} "

STCPNode::STCPNode(const string& name_, const string& host, const uint64_t recvTimeout_)
: STCPServer(host), name(name_), recvTimeout(recvTimeout_), _deserializeTimer("STCPNode::deserialize"), _sConsumeFrontTimer("STCPNode::SConsumeFront") {
: STCPServer(host), name(name_), recvTimeout(recvTimeout_), _deserializeTimer("STCPNode::deserialize"),
_sConsumeFrontTimer("STCPNode::SConsumeFront"), _sAppendTimer("STCPNode::append") {
}

STCPNode::~STCPNode() {
Expand Down Expand Up @@ -103,7 +104,10 @@ void STCPNode::prePoll(fd_map& fdm) {

void STCPNode::postPoll(fd_map& fdm, uint64_t& nextActivity) {
// Process the sockets
STCPServer::postPoll(fdm);
{
AutoTimerTime appendTime(_sAppendTimer);
STCPServer::postPoll(fdm);
}

// Accept any new peers
Socket* socket = nullptr;
Expand All @@ -127,7 +131,7 @@ void STCPNode::postPoll(fd_map& fdm, uint64_t& nextActivity) {
int messageSize = message.deserialize(socket->recvBuffer);
if (messageSize) {
// What is it?
SConsumeFront(socket->recvBuffer, messageSize);
socket->recvBuffer.consumeFront(messageSize);
if (SIEquals(message.methodLine, "NODE_LOGIN")) {
// Got it -- can we asssociate with a peer?
bool foundIt = false;
Expand Down Expand Up @@ -209,7 +213,7 @@ void STCPNode::postPoll(fd_map& fdm, uint64_t& nextActivity) {
// Which message?
{
AutoTimerTime consumeTime(_sConsumeFrontTimer);
SConsumeFront(peer->s->recvBuffer, messageSize);
peer->s->recvBuffer.consumeFront(messageSize);
}
if (peer->s->recvBuffer.size() > 10'000) {
// Make in known if this buffer ever gets big.
Expand Down
1 change: 1 addition & 0 deletions libstuff/STCPNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,5 @@ struct STCPNode : public STCPServer {

AutoTimer _deserializeTimer;
AutoTimer _sConsumeFrontTimer;
AutoTimer _sAppendTimer;
};
44 changes: 13 additions & 31 deletions libstuff/libstuff.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,15 @@ bool SIContains(const string& lhs, const string& rhs) {
return SContains(SToLower(lhs), SToLower(rhs));
}

bool SStartsWith(const string& haystack, const string& needle)
{
if (needle.size() > haystack.size()) {
bool SStartsWith(const string& haystack, const string& needle) {
return SStartsWith(haystack.c_str(), haystack.size(), needle.c_str(), needle.size());
}

bool SStartsWith(const char* haystack, size_t haystackSize, const char* needle, size_t needleSize) {
if (haystackSize < needleSize) {
return false;
}

return strncmp(haystack.c_str(), needle.c_str(), needle.size()) == 0;
return strncmp(haystack, needle, needleSize) == 0;
}

// --------------------------------------------------------------------------
Expand Down Expand Up @@ -579,27 +581,6 @@ bool SParseList(const char* ptr, list<string>& valueList, char separator) {
return (!component.empty());
}

// --------------------------------------------------------------------------
void SConsumeFront(string& lhs, ssize_t num) {
ssize_t lhsSize = lhs.size();
SASSERT(lhsSize >= num);
// If nothing, early out
if (!num){
return;
}

// If we're clearing the whole thing, early out
if (lhsSize == num) {
// Clear and done
lhs.clear();
return;
}

// Otherwise, move the end forward and resize
memmove(&lhs[0], &lhs[num], lhsSize - num);
lhs.resize(lhsSize - num);
}

// --------------------------------------------------------------------------
SData SParseCommandLine(int argc, char* argv[]) {
// Just walk across and find the pairs, then put the remainder on a list in the method
Expand Down Expand Up @@ -1105,7 +1086,7 @@ string SComposePOST(const STable& nameValueMap) {
}
}
string outStr = out.str();
SConsumeBack(outStr, 1); // Trim off trailing '&'
outStr.resize(outStr.size() - 1); // Trim off trailing '&'
return outStr;
}

Expand Down Expand Up @@ -1870,7 +1851,7 @@ bool SCheckNetworkErrorType(const string& logPrefix, const string& peer, int err
// --------------------------------------------------------------------------
// Receives data from a socket and appends to a string. Returns 'true' if
// the socket is still alive when done.
bool S_recvappend(int s, string& recvBuffer) {
bool S_recvappend(int s, SFastBuffer& recvBuffer) {
SASSERT(s);
// Figure out if this socket is blocking or non-blocking
int flags = fcntl(s, F_GETFL);
Expand Down Expand Up @@ -1915,14 +1896,15 @@ bool S_recvappend(int s, string& recvBuffer) {
}

// --------------------------------------------------------------------------
bool S_sendconsume(int s, string& sendBuffer) {
bool S_sendconsume(int s, SFastBuffer& sendBuffer) {
SASSERT(s);
// If empty, nothing to do
if (sendBuffer.empty()) {
return true; // Assume no error, still alive
}

if (SStartsWith(sendBuffer, "ESCALATE_RESPONSE")) {
// 17 is size of "ESCALATE_RESPONSE".
if (SStartsWith(sendBuffer.c_str(), sendBuffer.size(), "ESCALATE_RESPONSE", 17)) {
SData tempData;
tempData.deserialize(sendBuffer);
string id = tempData["id"];
Expand All @@ -1942,7 +1924,7 @@ bool S_sendconsume(int s, string& sendBuffer) {
<< " ms and sent " << numSent << " of " << sendBuffer.size() << " bytes." << errorMessage);

if (numSent > 0) {
SConsumeFront(sendBuffer, numSent);
sendBuffer.consumeFront(numSent);
}

// Exit if no error
Expand Down
Loading

0 comments on commit 1b81309

Please sign in to comment.