Skip to content

Commit

Permalink
apacheGH-40343: [C++] Move S3FileSystem to the registry
Browse files Browse the repository at this point in the history
  • Loading branch information
bkietz committed Sep 17, 2024
1 parent 1fd8a25 commit 39e2e0e
Show file tree
Hide file tree
Showing 17 changed files with 278 additions and 70 deletions.
6 changes: 6 additions & 0 deletions cpp/cmake_modules/DefineOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,12 @@ takes precedence over ccache if a storage backend is configured" ON)
DEPENDS
ARROW_FILESYSTEM)

define_option(ARROW_S3_MODULE
"Build the Arrow S3 filesystem as a dynamic module"
OFF
DEPENDS
ARROW_S3)

define_option(ARROW_SKYHOOK
"Build the Skyhook libraries"
OFF
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,18 @@ if(ARROW_FILESYSTEM)
foreach(ARROW_FILESYSTEM_TARGET ${ARROW_FILESYSTEM_TARGETS})
target_link_libraries(${ARROW_FILESYSTEM_TARGET} PRIVATE ${AWSSDK_LINK_LIBRARIES})
endforeach()

if(ARROW_S3_MODULE)
if(NOT ARROW_BUILD_SHARED)
message(FATAL_ERROR "ARROW_S3_MODULE without shared libarrow is not supported")
endif()

add_library(arrow_s3fs MODULE filesystem/s3fs_module.cc filesystem/s3fs.cc)
target_link_libraries(arrow_s3fs PRIVATE ${AWSSDK_LINK_LIBRARIES} arrow_shared)
set_source_files_properties(filesystem/s3fs.cc filesystem/s3fs_module.cc
PROPERTIES SKIP_PRECOMPILE_HEADERS ON
SKIP_UNITY_BUILD_INCLUSION ON)
endif()
endif()

list(APPEND ARROW_TESTING_SHARED_LINK_LIBS ${ARROW_GTEST_GMOCK})
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/adapters/orc/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "arrow/adapters/orc/util.h"

#include <cmath>
#include <sstream>
#include <string>
#include <string_view>
#include <vector>
Expand Down
18 changes: 18 additions & 0 deletions cpp/src/arrow/filesystem/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,24 @@ if(ARROW_S3)
target_link_libraries(arrow-filesystem-s3fs-benchmark PRIVATE parquet_shared)
endif()
endif()

if(ARROW_S3_MODULE)
add_arrow_test(s3fs_module_test
SOURCES
s3fs_module_test.cc
s3_test_util.cc
EXTRA_LABELS
filesystem
DEFINITIONS
ARROW_S3_LIBPATH="$<TARGET_FILE:arrow_s3fs>"
EXTRA_LINK_LIBS
Boost::filesystem
Boost::system)
target_compile_definitions(arrow-filesystem-test
PUBLIC ARROW_S3_LIBPATH="$<TARGET_FILE:arrow_s3fs>")
target_sources(arrow-filesystem-test PUBLIC s3fs_module_test.cc s3_test_util.cc)
target_link_libraries(arrow-filesystem-test PUBLIC Boost::filesystem Boost::system)
endif()
endif()

if(ARROW_HDFS)
Expand Down
44 changes: 28 additions & 16 deletions cpp/src/arrow/filesystem/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@
#ifdef ARROW_HDFS
# include "arrow/filesystem/hdfs.h"
#endif
#ifdef ARROW_S3
# include "arrow/filesystem/s3fs.h"
#endif
#include "arrow/filesystem/localfs.h"
#include "arrow/filesystem/mockfs.h"
#include "arrow/filesystem/path_util.h"
Expand Down Expand Up @@ -700,6 +697,29 @@ class FileSystemFactoryRegistry {
return &registry;
}

Status Unregister(const std::string& scheme) {
std::shared_lock lock{mutex_};
RETURN_NOT_OK(CheckValid());

auto it = scheme_to_factory_.find(scheme);
if (it == scheme_to_factory_.end()) {
return Status::KeyError("No factories found for scheme ", scheme,
", can't unregister");
}

std::function<void()> finalizer;
if (it->second.ok()) {
finalizer = it->second.ValueOrDie().finalizer;
}
scheme_to_factory_.erase(it);
lock.unlock();

if (finalizer) {
finalizer();
}
return Status::OK();
}

Result<const FileSystemFactory*> FactoryForScheme(const std::string& scheme) {
std::shared_lock lock{mutex_};
RETURN_NOT_OK(CheckValid());
Expand Down Expand Up @@ -749,7 +769,7 @@ class FileSystemFactoryRegistry {
if (finalized_) return;

for (const auto& [_, registered_or_error] : scheme_to_factory_) {
if (!registered_or_error.ok()) continue;
if (!registered_or_error.ok() || !registered_or_error->finalizer) continue;
registered_or_error->finalizer();
}
finalized_ = true;
Expand Down Expand Up @@ -819,6 +839,10 @@ FileSystemRegistrar::FileSystemRegistrar(std::string scheme, FileSystemFactory f

namespace internal {
void* GetFileSystemRegistry() { return FileSystemFactoryRegistry::GetInstance(); }

Status UnregisterFileSystemFactory(const std::string& scheme) {
return FileSystemFactoryRegistry::GetInstance()->Unregister(scheme);
}
} // namespace internal

Status LoadFileSystemFactories(const char* libpath) {
Expand Down Expand Up @@ -896,18 +920,6 @@ Result<std::shared_ptr<FileSystem>> FileSystemFromUriReal(const Uri& uri,
"without HDFS support");
#endif
}
if (scheme == "s3") {
#ifdef ARROW_S3
RETURN_NOT_OK(EnsureS3Initialized());
ARROW_ASSIGN_OR_RAISE(auto options, S3Options::FromUri(uri, out_path));
ARROW_ASSIGN_OR_RAISE(auto s3fs, S3FileSystem::Make(options, io_context));
return s3fs;
#else
return Status::NotImplemented(
"Got S3 URI but Arrow compiled "
"without S3 support");
#endif
}

if (scheme == "mock") {
// MockFileSystem does not have an
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/filesystem/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class ARROW_EXPORT FileSystem
virtual Result<std::string> PathFromUri(const std::string& uri_string) const;

/// \brief Make a URI from which FileSystemFromUri produces an equivalent filesystem
/// \param path The path component to use in the resulting URI
/// \param path The path component to use in the resulting URI. Must be absolute.
/// \return A URI string, or an error if an equivalent URI cannot be produced
virtual Result<std::string> MakeUri(std::string path) const;

Expand Down
9 changes: 8 additions & 1 deletion cpp/src/arrow/filesystem/localfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,14 @@ Result<std::string> LocalFileSystem::PathFromUri(const std::string& uri_string)

Result<std::string> LocalFileSystem::MakeUri(std::string path) const {
ARROW_ASSIGN_OR_RAISE(path, DoNormalizePath(std::move(path)));
return "file://" + path + (options_.use_mmap ? "?use_mmap" : "");
if (!internal::DetectAbsolutePath(path)) {
return Status::Invalid("MakeUri requires an absolute path, got ", path);
}
ARROW_ASSIGN_OR_RAISE(auto uri, util::UriFromAbsolutePath(path));
if (uri[0] == '/') {
uri = "file://" + uri;
}
return uri + (options_.use_mmap ? "?use_mmap" : "");
}

bool LocalFileSystem::Equals(const FileSystem& other) const {
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/filesystem/localfs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <utility>
#include <vector>

#include <gmock/gmock-matchers.h>
#include <gtest/gtest.h>

#include "arrow/filesystem/filesystem.h"
Expand Down Expand Up @@ -428,9 +429,15 @@ TYPED_TEST(TestLocalFS, FileSystemFromUriFile) {

this->TestLocalUri("file:///_?use_mmap", "/_");
if (this->path_formatter_.supports_uri()) {
EXPECT_THAT(this->fs_->MakeUri(""), Raises(StatusCode::Invalid));
EXPECT_THAT(this->fs_->MakeUri("a/b"), Raises(StatusCode::Invalid));

ASSERT_TRUE(this->local_fs_->options().use_mmap);
ASSERT_OK_AND_ASSIGN(auto uri, this->fs_->MakeUri("/_"));
EXPECT_EQ(uri, "file:///_?use_mmap");

ASSERT_OK_AND_ASSIGN(uri, this->fs_->MakeUri("/hello world/b/c"));
EXPECT_EQ(uri, "file:///hello%20world/b/c?use_mmap");
}

#ifdef _WIN32
Expand Down
90 changes: 64 additions & 26 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@
#include "arrow/util/string.h"
#include "arrow/util/task_group.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/value_parsing.h"

namespace arrow::fs {

Expand Down Expand Up @@ -168,6 +169,8 @@ static constexpr const char kAwsEndpointUrlEnvVar[] = "AWS_ENDPOINT_URL";
static constexpr const char kAwsEndpointUrlS3EnvVar[] = "AWS_ENDPOINT_URL_S3";
static constexpr const char kAwsDirectoryContentType[] = "application/x-directory";

using std::operator""s;

// -----------------------------------------------------------------------
// S3ProxyOptions implementation

Expand Down Expand Up @@ -3031,6 +3034,30 @@ Result<std::string> S3FileSystem::PathFromUri(const std::string& uri_string) con
internal::AuthorityHandlingBehavior::kPrepend);
}

Result<std::string> S3FileSystem::MakeUri(std::string path) const {
if (path.length() <= 1 || path[0] != '/') {
return Status::Invalid("MakeUri requires an absolute, non-root path, got ", path);
}
ARROW_ASSIGN_OR_RAISE(auto uri, util::UriFromAbsolutePath(path));
if (!options().GetAccessKey().empty()) {
uri = "s3://" + options().GetAccessKey() + ":" + options().GetSecretKey() + "@" +
uri.substr("file:///"s.size());
} else {
uri = "s3" + uri.substr("file"s.size());
}
uri += "?";
uri += "region=" + util::UriEscape(options().region);
uri += "&";
uri += "scheme=" + options().scheme;
uri += "&";
uri += "endpoint_override=" + util::UriEscape(options().endpoint_override);
uri += "&";
uri += "allow_bucket_creation="s + (options().allow_bucket_creation ? "1" : "0");
uri += "&";
uri += "allow_bucket_deletion="s + (options().allow_bucket_deletion ? "1" : "0");
return uri;
}

S3Options S3FileSystem::options() const { return impl_->options(); }

std::string S3FileSystem::region() const { return impl_->region(); }
Expand Down Expand Up @@ -3492,32 +3519,33 @@ bool IsS3Finalized() { return GetAwsInstance()->IsFinalized(); }

S3GlobalOptions S3GlobalOptions::Defaults() {
auto log_level = S3LogLevel::Fatal;

auto result = arrow::internal::GetEnvVar("ARROW_S3_LOG_LEVEL");

if (result.ok()) {
// Extract, trim, and downcase the value of the environment variable
auto value =
arrow::internal::AsciiToLower(arrow::internal::TrimString(result.ValueUnsafe()));

if (value == "fatal") {
log_level = S3LogLevel::Fatal;
} else if (value == "error") {
log_level = S3LogLevel::Error;
} else if (value == "warn") {
log_level = S3LogLevel::Warn;
} else if (value == "info") {
log_level = S3LogLevel::Info;
} else if (value == "debug") {
log_level = S3LogLevel::Debug;
} else if (value == "trace") {
log_level = S3LogLevel::Trace;
} else if (value == "off") {
log_level = S3LogLevel::Off;
}
}

return S3GlobalOptions{log_level};
int num_event_loop_threads = 1;
// Extract, trim, and downcase the value of the environment variable
auto value = arrow::internal::GetEnvVar("ARROW_S3_LOG_LEVEL")
.Map(arrow::internal::AsciiToLower)
.Map(arrow::internal::TrimString)
.ValueOr("fatal");
if (value == "fatal") {
log_level = S3LogLevel::Fatal;
} else if (value == "error") {
log_level = S3LogLevel::Error;
} else if (value == "warn") {
log_level = S3LogLevel::Warn;
} else if (value == "info") {
log_level = S3LogLevel::Info;
} else if (value == "debug") {
log_level = S3LogLevel::Debug;
} else if (value == "trace") {
log_level = S3LogLevel::Trace;
} else if (value == "off") {
log_level = S3LogLevel::Off;
}

value = arrow::internal::GetEnvVar("ARROW_S3_THREADS").ValueOr("1");
if (uint32_t u; ::arrow::internal::ParseUnsigned(value.data(), value.size(), &u)) {
num_event_loop_threads = u;
}
return S3GlobalOptions{log_level, num_event_loop_threads};
}

// -----------------------------------------------------------------------
Expand All @@ -3535,4 +3563,14 @@ Result<std::string> ResolveS3BucketRegion(const std::string& bucket) {
return resolver->ResolveRegion(bucket);
}

auto kS3FileSystemModule = ARROW_REGISTER_FILESYSTEM(
"s3",
[](const arrow::util::Uri& uri, const io::IOContext& io_context,
std::string* out_path) -> Result<std::shared_ptr<fs::FileSystem>> {
RETURN_NOT_OK(EnsureS3Initialized());
ARROW_ASSIGN_OR_RAISE(auto options, S3Options::FromUri(uri, out_path));
return S3FileSystem::Make(options, io_context);
},
[] { DCHECK_OK(EnsureS3Finalized()); });

} // namespace arrow::fs
18 changes: 7 additions & 11 deletions cpp/src/arrow/filesystem/s3fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,16 @@
#include "arrow/util/macros.h"
#include "arrow/util/uri.h"

namespace Aws {
namespace Auth {

namespace Aws::Auth {
class AWSCredentialsProvider;
class STSAssumeRoleCredentialsProvider;
} // namespace Aws::Auth

} // namespace Auth
namespace STS {
namespace Aws::STS {
class STSClient;
}
} // namespace Aws
} // namespace Aws::STS

namespace arrow {
namespace fs {
namespace arrow::fs {

/// Options for using a proxy for S3
struct ARROW_EXPORT S3ProxyOptions {
Expand Down Expand Up @@ -277,6 +273,7 @@ class ARROW_EXPORT S3FileSystem : public FileSystem {

bool Equals(const FileSystem& other) const override;
Result<std::string> PathFromUri(const std::string& uri_string) const override;
Result<std::string> MakeUri(std::string path) const override;

/// \cond FALSE
using FileSystem::CreateDir;
Expand Down Expand Up @@ -418,5 +415,4 @@ Status EnsureS3Finalized();
ARROW_EXPORT
Result<std::string> ResolveS3BucketRegion(const std::string& bucket);

} // namespace fs
} // namespace arrow
} // namespace arrow::fs
18 changes: 18 additions & 0 deletions cpp/src/arrow/filesystem/s3fs_module.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/filesystem/filesystem_library.h"
Loading

0 comments on commit 39e2e0e

Please sign in to comment.