Skip to content

Commit

Permalink
fix(http,network):1.8.10
Browse files Browse the repository at this point in the history
1.解决BufferedFd中setSendCompleteCallback()在发送小数据时没有触发的问题;
2.修改http中的Server::Impl中回复之后关Socket的时机,改成完成发送之后再关。解决数据回复不全的问题;
  • Loading branch information
hevake committed Apr 17, 2024
1 parent 35d8574 commit a139493
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 34 deletions.
6 changes: 2 additions & 4 deletions examples/http/server/async_respond/async_respond.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* of the source tree.
*/
#include <tbox/base/log.h>
#include <tbox/base/log_output.h>
#include <tbox/base/scope_exit.hpp>
#include <tbox/log/async_stdout_sink.h>
#include <tbox/event/signal_event.h>
Expand All @@ -38,10 +39,7 @@ int main(int argc, char **argv)
bind_addr = argv[1];
}

log::AsyncStdoutSink log;
log.enable();
log.enableColor(true);
log.setLevel(LOG_LEVEL_TRACE);
LogOutput_Enable();

LogInfo("enter");

Expand Down
7 changes: 2 additions & 5 deletions examples/http/server/router/router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
* of the source tree.
*/
#include <tbox/base/log.h>
#include <tbox/base/log_output.h>
#include <tbox/base/scope_exit.hpp>
#include <tbox/log/async_stdout_sink.h>
#include <tbox/event/signal_event.h>
#include <tbox/http/server/server.h>
#include <tbox/http/server/router.h>
Expand All @@ -37,10 +37,7 @@ int main(int argc, char **argv)
bind_addr = argv[1];
}

log::AsyncStdoutSink log;
log.enable();
log.enableColor(true);
log.setLevel(LOG_LEVEL_TRACE);
LogOutput_Enable();

LogInfo("enter");

Expand Down
7 changes: 2 additions & 5 deletions examples/http/server/simple/simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
* of the source tree.
*/
#include <tbox/base/log.h>
#include <tbox/base/log_output.h>
#include <tbox/base/scope_exit.hpp>
#include <tbox/log/async_stdout_sink.h>
#include <tbox/event/signal_event.h>
#include <tbox/http/server/server.h>

Expand All @@ -36,10 +36,7 @@ int main(int argc, char **argv)
bind_addr = argv[1];
}

log::AsyncStdoutSink log;
log.enable();
log.enableColor(true);
log.setLevel(LOG_LEVEL_TRACE);
LogOutput_Enable();

LogInfo("enter");

Expand Down
49 changes: 32 additions & 17 deletions modules/http/server/server_imp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ bool Server::Impl::initialize(const network::SockAddr &bind_addr, int listen_bac
tcp_server_.setConnectedCallback(bind(&Impl::onTcpConnected, this, _1));
tcp_server_.setDisconnectedCallback(bind(&Impl::onTcpDisconnected, this, _1));
tcp_server_.setReceiveCallback(bind(&Impl::onTcpReceived, this, _1, _2), 0);
tcp_server_.setSendCompleteCallback(bind(&Impl::onTcpSendCompleted, this, _1));

state_ = State::kInited;
return true;
Expand Down Expand Up @@ -111,6 +112,8 @@ void Server::Impl::onTcpConnected(const TcpServer::ConnToken &ct)
void Server::Impl::onTcpDisconnected(const TcpServer::ConnToken &ct)
{
Connection *conn = static_cast<Connection*>(tcp_server_.getContext(ct));
TBOX_ASSERT(conn != nullptr);

conns_.erase(conn);
delete conn;
}
Expand Down Expand Up @@ -140,6 +143,7 @@ bool IsLastRequest(const Request *req)
void Server::Impl::onTcpReceived(const TcpServer::ConnToken &ct, Buffer &buff)
{
Connection *conn = static_cast<Connection*>(tcp_server_.getContext(ct));
TBOX_ASSERT(conn != nullptr);

//! 如果已被标记为最后的请求,就不应该再有请求来
if (conn->close_index != numeric_limits<int>::max()) {
Expand Down Expand Up @@ -182,6 +186,23 @@ void Server::Impl::onTcpReceived(const TcpServer::ConnToken &ct, Buffer &buff)
}
}

void Server::Impl::onTcpSendCompleted(const TcpServer::ConnToken &ct)
{
if (!tcp_server_.isClientValid(ct))
return;

Connection *conn = static_cast<Connection*>(tcp_server_.getContext(ct));
TBOX_ASSERT(conn != nullptr);

//! 如果最后一个已完成发送,则断开连接
if (conn->res_index > conn->close_index) {
LogTag();
tcp_server_.disconnect(ct);
conns_.erase(conn);
delete conn;
}
}

/**
* 为了保证管道化连接中Respond与Request的顺序一致性,要做特殊处理。
* 如果所提交的index不是当前需要回复的res_index,那么就先暂存起来,等前面的发送完成后再发送;
Expand All @@ -195,6 +216,8 @@ void Server::Impl::commitRespond(const TcpServer::ConnToken &ct, int index, Resp
}

Connection *conn = static_cast<Connection*>(tcp_server_.getContext(ct));
TBOX_ASSERT(conn != nullptr);

if (index == conn->res_index) {
//! 将当前的数据直接发送出去
{
Expand All @@ -205,16 +228,12 @@ void Server::Impl::commitRespond(const TcpServer::ConnToken &ct, int index, Resp
LogDbg("RES: [%s]", content.c_str());
}

//! 如果当前这个回复是最后一个,则需要断开连接
if (index == conn->close_index) {
tcp_server_.disconnect(ct);
conns_.erase(conn);
delete conn;
return;
}

++conn->res_index;

//! 如果当前这个回复是最后一个,则不需要发送缓存中的数据
if (conn->res_index > conn->close_index)
return;

//! 尝试发送 conn->res_buff 缓冲中的数据
auto &res_buff = conn->res_buff;
auto iter = res_buff.find(conn->res_index);
Expand All @@ -229,17 +248,13 @@ void Server::Impl::commitRespond(const TcpServer::ConnToken &ct, int index, Resp
LogDbg("RES: [%s]", content.c_str());
}

//! 如果当前这个回复是最后一个,则需要断开连接
if (index == conn->close_index) {
tcp_server_.disconnect(ct);
conns_.erase(conn);
delete conn;
return;
}

res_buff.erase(iter);

++conn->res_index;

//! 如果当前这个回复是最后一个,则不需要再发送缓存中的数据
if (conn->res_index > conn->close_index)
return;

iter = res_buff.find(conn->res_index);
}
} else {
Expand Down
1 change: 1 addition & 0 deletions modules/http/server/server_imp.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class Server::Impl {
void onTcpConnected(const TcpServer::ConnToken &ct);
void onTcpDisconnected(const TcpServer::ConnToken &ct);
void onTcpReceived(const TcpServer::ConnToken &ct, Buffer &buff);
void onTcpSendCompleted(const TcpServer::ConnToken &ct);

//! 连接信息
struct Connection {
Expand Down
3 changes: 2 additions & 1 deletion modules/network/buffered_fd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ bool BufferedFd::send(const void *data_ptr, size_t data_size)
//! 则将剩余的数据放入到缓冲区
const uint8_t* p_remain = static_cast<const uint8_t*>(data_ptr) + wsize;
send_buff_.append(p_remain, (data_size - wsize));
sp_write_event_->enable(); //! 等待可写事件
}
sp_write_event_->enable(); //! 等待可写事件

} else { //! 否则就是出了错
if (errno == EAGAIN) { //! 文件操作繁忙
send_buff_.append(data_ptr, data_size);
Expand Down
54 changes: 53 additions & 1 deletion modules/network/buffered_fd_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,59 @@ TEST(BufferedFd, pipe_test)
delete sp_loop;
}

TEST(BufferedFd, sendHugeData)
//! 测试在发送小数据完成之后,有没有回调setSendCompleteCallback()对应的函数
TEST(BufferedFd, sendComplete_LittleData)
{
Loop* sp_loop = Loop::New();
ASSERT_TRUE(sp_loop);

int fds[2] = { 0 };
ASSERT_EQ(pipe(fds), 0);

int recv_count = 0;

//! 创建接收的BufferedFd
BufferedFd *read_buff_fd = new BufferedFd(sp_loop);
read_buff_fd->initialize(fds[0]);
read_buff_fd->setReceiveCallback(
[&] (Buffer &buff) {
recv_count += buff.readableSize();
buff.hasReadAll();
}, 0
);
read_buff_fd->enable();

bool is_send_completed = false;
//! 创建发送的BufferedFd
BufferedFd *write_buff_fd = new BufferedFd(sp_loop);
write_buff_fd->initialize(fds[1]);
write_buff_fd->setSendCompleteCallback(
[&] {
write_buff_fd->disable();
CHECK_CLOSE_RESET_FD(fds[1]);
is_send_completed = true;
}
);
write_buff_fd->enable();

//! 发送10B数据
std::vector<uint8_t> send_data(10, 0);
write_buff_fd->send(send_data.data(), send_data.size());

sp_loop->exitLoop(std::chrono::milliseconds(10));
sp_loop->runLoop();

EXPECT_EQ(recv_count, send_data.size());
EXPECT_TRUE(is_send_completed);

CHECK_CLOSE_RESET_FD(fds[0]);
delete read_buff_fd;
delete write_buff_fd;
delete sp_loop;
}

//! 测试在发送大数据完成之后,有没有回调setSendCompleteCallback()对应的函数
TEST(BufferedFd, sendComplete_HugeData)
{
Loop* sp_loop = Loop::New();
ASSERT_TRUE(sp_loop);
Expand Down
1 change: 1 addition & 0 deletions modules/network/tcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ void TcpServer::onTcpConnected(TcpConnection *new_conn)
ConnToken client = d_->conns.alloc(new_conn);
new_conn->setReceiveCallback(std::bind(&TcpServer::onTcpReceived, this, client, _1), d_->receive_threshold);
new_conn->setDisconnectedCallback(std::bind(&TcpServer::onTcpDisconnected, this, client));
new_conn->setSendCompleteCallback(std::bind(&TcpServer::onTcpSendCompleted, this, client));

++d_->cb_level;
if (d_->connected_cb)
Expand Down
2 changes: 1 addition & 1 deletion version.mk
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# TBOX版本号
TBOX_VERSION_MAJOR := 1
TBOX_VERSION_MINOR := 8
TBOX_VERSION_REVISION := 9
TBOX_VERSION_REVISION := 10

0 comments on commit a139493

Please sign in to comment.