Skip to content

Commit

Permalink
Merge branch 'master' into fix_ingest_behind_lose
Browse files Browse the repository at this point in the history
  • Loading branch information
ninsmiracle committed Oct 25, 2023
2 parents b939a0d + 21e25ca commit aac1034
Show file tree
Hide file tree
Showing 93 changed files with 639 additions and 404 deletions.
12 changes: 0 additions & 12 deletions .github/workflows/standardization_lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,6 @@ jobs:
- uses: actions/checkout@v3
- uses: gaurav-nelson/[email protected]

related_issue:
name: Check issue
runs-on: ubuntu-latest
steps:
- uses: neofinancial/[email protected]
with:
token: ${{ secrets.GITHUB_TOKEN }}
titleFormat: '%title%'
quiet: true
bodyRegex: '#(\d+)'
bodyURLRegex: 'http(s?):\/\/(github.com)(\/apache)(\/incubator-pegasus)(\/issues)\/\d+'

dockerfile_linter:
name: Lint Dockerfile
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion docker/pegasus-build-env/ubuntu1804/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ RUN add-apt-repository ppa:git-core/ppa -y; \
apt-get install pkg-config -y --no-install-recommends; \
rm -rf /var/lib/apt/lists/*

RUN pip3 install --upgrade pip && pip3 install --no-cache-dir cmake
RUN pip3 install --no-cache-dir --upgrade pip && pip3 install --no-cache-dir cmake

RUN wget --progress=dot:giga https://github.com/apache/thrift/archive/refs/tags/0.11.0.tar.gz -P /opt/thrift && \
cd /opt/thrift && tar xzf 0.11.0.tar.gz && cd thrift-0.11.0 && ./bootstrap.sh && \
Expand Down
3 changes: 1 addition & 2 deletions src/aio/test/aio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ class aio_test : public pegasus::encrypt_data_test_base
const std::string kTestFileName = "aio_test.txt";
};

// TODO(yingchun): ENCRYPTION: add enable encryption test.
INSTANTIATE_TEST_CASE_P(, aio_test, ::testing::Values(false));
INSTANTIATE_TEST_CASE_P(, aio_test, ::testing::Values(false, true));

TEST_P(aio_test, basic)
{
Expand Down
5 changes: 4 additions & 1 deletion src/block_service/fds/fds_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "utils/TokenBucket.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
#include "utils/env.h"
#include "utils/error_code.h"
#include "utils/filesystem.h"
#include "utils/flags.h"
Expand Down Expand Up @@ -606,7 +607,8 @@ dsn::task_ptr fds_file_object::upload(const upload_request &req,
const std::string &local_file = req.input_local_name;
// get file size
int64_t file_sz = 0;
dsn::utils::filesystem::file_size(local_file, file_sz);
dsn::utils::filesystem::file_size(
local_file, dsn::utils::FileDataType::kSensitive, file_sz);

upload_response resp;
// TODO: we can cache the whole file in buffer, then upload the buffer rather than the
Expand Down Expand Up @@ -671,6 +673,7 @@ dsn::task_ptr fds_file_object::download(const download_request &req,
t->set_tracker(tracker);
download_response resp;

// TODO(yingchun): use rocksdb API to implement this.
std::shared_ptr<std::ofstream> handle(new std::ofstream(
req.output_local_name, std::ios::binary | std::ios::out | std::ios::trunc));
if (!handle->is_open()) {
Expand Down
7 changes: 4 additions & 3 deletions src/block_service/hdfs/hdfs_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,8 @@ dsn::task_ptr hdfs_file_object::upload(const upload_request &req,
rocksdb::EnvOptions env_options;
env_options.use_direct_reads = FLAGS_enable_direct_io;
std::unique_ptr<rocksdb::SequentialFile> rfile;
auto s = rocksdb::Env::Default()->NewSequentialFile(
req.input_local_name, &rfile, env_options);
auto s = dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive)
->NewSequentialFile(req.input_local_name, &rfile, env_options);
if (!s.ok()) {
LOG_ERROR(
"open local file '{}' failed, err = {}", req.input_local_name, s.ToString());
Expand Down Expand Up @@ -552,7 +552,8 @@ dsn::task_ptr hdfs_file_object::download(const download_request &req,
rocksdb::EnvOptions env_options;
env_options.use_direct_writes = FLAGS_enable_direct_io;
std::unique_ptr<rocksdb::WritableFile> wfile;
auto s = rocksdb::Env::Default()->NewWritableFile(target_file, &wfile, env_options);
auto s = dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive)
->NewWritableFile(target_file, &wfile, env_options);
if (!s.ok()) {
LOG_ERROR("create local file '{}' failed, err = {}", target_file, s.ToString());
break;
Expand Down
17 changes: 10 additions & 7 deletions src/block_service/local/local_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ error_code local_file_object::load_metadata()

std::string metadata_path = local_service::get_metafile(file_name());
std::string data;
auto s = rocksdb::ReadFileToString(rocksdb::Env::Default(), metadata_path, &data);
auto s = rocksdb::ReadFileToString(
dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive), metadata_path, &data);
if (!s.ok()) {
LOG_WARNING("read file '{}' failed, err = {}", metadata_path, s.ToString());
return ERR_FS_INTERNAL;
Expand All @@ -294,10 +295,11 @@ error_code local_file_object::store_metadata()
meta.size = _size;
std::string data = nlohmann::json(meta).dump();
std::string metadata_path = local_service::get_metafile(file_name());
auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
rocksdb::Slice(data),
metadata_path,
/* should_sync */ true);
auto s =
rocksdb::WriteStringToFile(dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive),
rocksdb::Slice(data),
metadata_path,
/* should_sync */ true);
if (!s.ok()) {
LOG_WARNING("store to metadata file {} failed, err={}", metadata_path, s.ToString());
return ERR_FS_INTERNAL;
Expand Down Expand Up @@ -341,7 +343,7 @@ dsn::task_ptr local_file_object::write(const write_request &req,

do {
auto s = rocksdb::WriteStringToFile(
rocksdb::Env::Default(),
dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive),
rocksdb::Slice(req.buffer.data(), req.buffer.length()),
file_name(),
/* should_sync */ true);
Expand Down Expand Up @@ -418,7 +420,8 @@ dsn::task_ptr local_file_object::read(const read_request &req,
rocksdb::EnvOptions env_options;
env_options.use_direct_reads = FLAGS_enable_direct_io;
std::unique_ptr<rocksdb::SequentialFile> sfile;
auto s = rocksdb::Env::Default()->NewSequentialFile(file_name(), &sfile, env_options);
auto s = dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive)
->NewSequentialFile(file_name(), &sfile, env_options);
if (!s.ok()) {
LOG_WARNING("open file '{}' failed, err = {}", file_name(), s.ToString());
resp.err = ERR_FS_INTERNAL;
Expand Down
1 change: 1 addition & 0 deletions src/block_service/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ set(MY_PROJ_LIBS
dsn.block_service.fds
dsn.block_service.hdfs
dsn_runtime
dsn_utils
galaxy-fds-sdk-cpp
PocoNet
PocoFoundation
Expand Down
3 changes: 1 addition & 2 deletions src/block_service/test/block_service_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ class block_service_manager_test : public pegasus::encrypt_data_test_base
std::string FILE_NAME = "test_file";
};

// TODO(yingchun): ENCRYPTION: add enable encryption test.
INSTANTIATE_TEST_CASE_P(, block_service_manager_test, ::testing::Values(false));
INSTANTIATE_TEST_CASE_P(, block_service_manager_test, ::testing::Values(false, true));

TEST_P(block_service_manager_test, remote_file_not_exist)
{
Expand Down
1 change: 1 addition & 0 deletions src/block_service/test/fds_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ void FDSClientTest::TearDown() {}

DEFINE_TASK_CODE(lpc_btest, TASK_PRIORITY_HIGH, dsn::THREAD_POOL_DEFAULT)

// TODO(yingchun): add encryption test when FDSClient supports encryption.
TEST_F(FDSClientTest, test_basic_operation)
{
const char *files[] = {"/fdstest/fdstest1/test1/test1",
Expand Down
6 changes: 3 additions & 3 deletions src/block_service/test/hdfs_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ void HDFSClientTest::generate_test_file(const std::string &filename)
{
int lines = FLAGS_num_test_file_lines;
std::unique_ptr<rocksdb::WritableFile> wfile;
auto s = rocksdb::Env::Default()->NewWritableFile(filename, &wfile, rocksdb::EnvOptions());
auto s = dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive)
->NewWritableFile(filename, &wfile, rocksdb::EnvOptions());
ASSERT_TRUE(s.ok()) << s.ToString();
for (int i = 0; i < lines; ++i) {
rocksdb::Slice data(fmt::format("{:04}d_this_is_a_simple_test_file\n", i));
Expand Down Expand Up @@ -128,8 +129,7 @@ void HDFSClientTest::write_test_files_async(const std::string &local_test_path,
}
}

// TODO(yingchun): ENCRYPTION: add enable encryption test.
INSTANTIATE_TEST_CASE_P(, HDFSClientTest, ::testing::Values(false));
INSTANTIATE_TEST_CASE_P(, HDFSClientTest, ::testing::Values(false, true));

TEST_P(HDFSClientTest, test_hdfs_read_write)
{
Expand Down
34 changes: 19 additions & 15 deletions src/block_service/test/local_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

#include "block_service/local/local_service.h"
#include "test_util/test_util.h"
#include "utils/env.h"
#include "utils/error_code.h"

namespace dsn {
Expand All @@ -47,8 +48,7 @@ class local_service_test : public pegasus::encrypt_data_test_base
{
};

// TODO(yingchun): ENCRYPTION: add enable encryption test.
INSTANTIATE_TEST_CASE_P(, local_service_test, ::testing::Values(false));
INSTANTIATE_TEST_CASE_P(, local_service_test, ::testing::Values(false, true));

TEST_P(local_service_test, store_metadata)
{
Expand All @@ -60,7 +60,8 @@ TEST_P(local_service_test, store_metadata)
ASSERT_TRUE(boost::filesystem::exists(meta_file_path));

std::string data;
auto s = rocksdb::ReadFileToString(rocksdb::Env::Default(), meta_file_path, &data);
auto s = rocksdb::ReadFileToString(
dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive), meta_file_path, &data);
ASSERT_TRUE(s.ok()) << s.ToString();

nlohmann::json j = nlohmann::json::parse(data);
Expand All @@ -76,10 +77,11 @@ TEST_P(local_service_test, load_metadata)
{
nlohmann::json j({{"md5", "abcde"}, {"size", 5}});
std::string data = j.dump();
auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
rocksdb::Slice(data),
meta_file_path,
/* should_sync */ true);
auto s =
rocksdb::WriteStringToFile(dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive),
rocksdb::Slice(data),
meta_file_path,
/* should_sync */ true);
ASSERT_TRUE(s.ok()) << s.ToString();

ASSERT_EQ(ERR_OK, file.load_metadata());
Expand All @@ -88,10 +90,11 @@ TEST_P(local_service_test, load_metadata)
}

{
auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
rocksdb::Slice("invalid json string"),
meta_file_path,
/* should_sync */ true);
auto s =
rocksdb::WriteStringToFile(dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive),
rocksdb::Slice("invalid json string"),
meta_file_path,
/* should_sync */ true);
ASSERT_TRUE(s.ok()) << s.ToString();

local_file_object file2("a.txt");
Expand All @@ -101,10 +104,11 @@ TEST_P(local_service_test, load_metadata)
{
nlohmann::json j({{"md5", "abcde"}, {"no such key", "illegal"}});
std::string data = j.dump();
auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
rocksdb::Slice(data),
meta_file_path,
/* should_sync */ true);
auto s =
rocksdb::WriteStringToFile(dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive),
rocksdb::Slice(data),
meta_file_path,
/* should_sync */ true);
ASSERT_TRUE(s.ok()) << s.ToString();

local_file_object file2("a.txt");
Expand Down
6 changes: 3 additions & 3 deletions src/common/fs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,10 @@ class fs_manager
friend class replica_disk_migrator;
friend class replica_disk_test_base;
friend class open_replica_test;
FRIEND_TEST(fs_manager, find_best_dir_for_new_replica);
FRIEND_TEST(fs_manager, get_dir_node);
FRIEND_TEST(fs_manager_test, find_best_dir_for_new_replica);
FRIEND_TEST(fs_manager_test, get_dir_node);
FRIEND_TEST(open_replica_test, open_replica_add_decree_and_ballot_check);
FRIEND_TEST(replica_error_test, test_auto_trash_of_corruption);
FRIEND_TEST(replica_test, test_auto_trash_of_corruption);
};
} // replication
} // dsn
22 changes: 15 additions & 7 deletions src/common/test/fs_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* under the License.
*/

// IWYU pragma: no_include <gtest/gtest-param-test.h>
// IWYU pragma: no_include <ext/alloc_traits.h>
// IWYU pragma: no_include <gtest/gtest-message.h>
// IWYU pragma: no_include <gtest/gtest-test-part.h>
Expand All @@ -33,6 +34,7 @@
#include "common/gpid.h"
#include "common/replication_other_types.h"
#include "metadata_types.h"
#include "test_util/test_util.h"
#include "utils/fail_point.h"
#include "utils/filesystem.h"

Expand All @@ -47,7 +49,13 @@ TEST(dir_node, replica_dir)
ASSERT_EQ("path/1.0.test", dn.replica_dir("test", gpid(1, 0)));
}

TEST(fs_manager, initialize)
class fs_manager_test : public pegasus::encrypt_data_test_base
{
};

INSTANTIATE_TEST_CASE_P(, fs_manager_test, ::testing::Values(false, true));

TEST_P(fs_manager_test, initialize)
{
fail::setup();
struct broken_disk_test
Expand All @@ -69,7 +77,7 @@ TEST(fs_manager, initialize)
fail::teardown();
}

TEST(fs_manager, dir_update_disk_status)
TEST_P(fs_manager_test, dir_update_disk_status)
{
struct update_disk_status
{
Expand All @@ -94,7 +102,7 @@ TEST(fs_manager, dir_update_disk_status)
}
}

TEST(fs_manager, get_dir_node)
TEST_P(fs_manager_test, get_dir_node)
{
fs_manager fm;
fm.initialize({"./data1"}, {"data1"});
Expand All @@ -115,7 +123,7 @@ TEST(fs_manager, get_dir_node)
ASSERT_EQ(nullptr, fm.get_dir_node(base_dir + "/data2/replica1"));
}

TEST(fs_manager, find_replica_dir)
TEST_P(fs_manager_test, find_replica_dir)
{
fs_manager fm;
fm.initialize({"./data1", "./data2", "./data3"}, {"data1", "data2", "data3"});
Expand All @@ -137,7 +145,7 @@ TEST(fs_manager, find_replica_dir)
ASSERT_EQ(dn, dn1);
}

TEST(fs_manager, create_replica_dir_if_necessary)
TEST_P(fs_manager_test, create_replica_dir_if_necessary)
{
fs_manager fm;

Expand All @@ -154,7 +162,7 @@ TEST(fs_manager, create_replica_dir_if_necessary)
ASSERT_EQ("data1", dn->tag);
}

TEST(fs_manager, create_child_replica_dir)
TEST_P(fs_manager_test, create_child_replica_dir)
{
fs_manager fm;
fm.initialize({"./data1", "./data2", "./data3"}, {"data1", "data2", "data3"});
Expand All @@ -174,7 +182,7 @@ TEST(fs_manager, create_child_replica_dir)
ASSERT_EQ(dir, child_dir);
}

TEST(fs_manager, find_best_dir_for_new_replica)
TEST_P(fs_manager_test, find_best_dir_for_new_replica)
{
// dn1 | 1.0, 1.1 +1.6
// dn2 | 1.2, 1.3 +1.7 2.0
Expand Down
3 changes: 2 additions & 1 deletion src/geo/bench/bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

#include "geo/lib/geo_client.h"
#include "geo/lib/latlng_codec.h"
#include "utils/env.h"
#include "utils/errors.h"
#include "utils/fmt_logging.h"
#include "utils/string_conv.h"
Expand Down Expand Up @@ -110,7 +111,7 @@ int main(int argc, char **argv)
RESULT_COUNT
};
auto statistics = rocksdb::CreateDBStatistics();
rocksdb::Env *env = rocksdb::Env::Default();
rocksdb::Env *env = dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive);
uint64_t start = env->NowNanos();
std::atomic<uint64_t> count(test_count);
dsn::utils::notify_event get_completed;
Expand Down
5 changes: 5 additions & 0 deletions src/geo/test/geo_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/

#include <base/pegasus_key_schema.h>
// IWYU pragma: no_include <gtest/gtest-param-test.h>
// IWYU pragma: no_include <gtest/gtest-message.h>
// IWYU pragma: no_include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
Expand Down Expand Up @@ -55,6 +56,10 @@ namespace geo {

DSN_DECLARE_int32(min_level);

// TODO(yingchun): it doesn't make sense to derive from pegasus::encrypt_data_test_base to test
// encryption or non-encryption senarios, because the Pegasus cluster has been started with a
// fixed value of FLAGS_encrypt_data_at_rest.
// We can test the senarios after clearing and restarting the cluster.
class geo_client_test : public ::testing::Test
{
public:
Expand Down
Loading

0 comments on commit aac1034

Please sign in to comment.