Skip to content

Commit

Permalink
Fix page-migration background thread on fork (#31)
Browse files Browse the repository at this point in the history
* Fix page-migration background thread on fork

After falling off main in the forked child, all the children
try to join on on the parent's monitoring thread. This results
in a deadlock. Parent is waiting for the child to exit, but
the child is trying to join the parent's thread which is
signaled from the parent's static destructors.

Even with just one parent and child, due to copy-on-write
semantics, a child signalling the background thread to join
will still block (thread's updated state is not visible
in the child).

This fix creates background treads on fork per-child with a
pthread_atfork handler, ensuring that each child has its own
monitoring thread.

* Formatting fixes

* Detach page-migration background thread and update test timeout

* Attach files with ctest

* Update corr-id assert

* Tweak on-fork, simplify background thread

* Revert thread detach
  • Loading branch information
MythreyaK authored Dec 6, 2024
1 parent fc25138 commit e7d4562
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 75 deletions.
138 changes: 72 additions & 66 deletions source/lib/rocprofiler-sdk/page_migration/page_migration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,11 +539,14 @@ handle_reporting(std::string_view event_data)

} // namespace

// KFD utils
namespace kfd
namespace
{
void poll_events(small_vector<pollfd>);
}

// KFD utils
namespace kfd
{
using fd_flags_t = decltype(EFD_NONBLOCK);
using fd_t = decltype(pollfd::fd);
constexpr auto KFD_DEVICE_PATH{"/dev/kfd"};
Expand Down Expand Up @@ -619,17 +622,18 @@ struct poll_kfd_t
const rocprofiler_agent_t* agent = nullptr;
};

kfd_device_fd kfd_fd = {};
small_vector<pollfd> file_handles = {};
pollfd thread_notify = {};
std::thread bg_thread = {};
bool active = {false};
kfd_device_fd kfd_fd = {};
pollfd thread_notify = {};
std::thread bg_thread = {};
bool active = {false};

poll_kfd_t() = default;

poll_kfd_t(const small_vector<size_t>& rprof_ev)
: kfd_fd{kfd_device_fd{}}
{
small_vector<pollfd> file_handles = {};

const auto kfd_flags =
kfd_bitmask(rprof_ev, std::make_index_sequence<ROCPROFILER_PAGE_MIGRATION_LAST>{});

Expand Down Expand Up @@ -703,7 +707,24 @@ struct poll_kfd_t
poll_kfd_t(poll_kfd_t&&) = default;
poll_kfd_t& operator=(poll_kfd_t&&) = default;

~poll_kfd_t();
~poll_kfd_t()
{
ROCP_TRACE << fmt::format("Terminating poll_kfd\n");
if(!active) return;

// wake thread up
auto bytes_written{-1};
do
{
bytes_written = write(thread_notify.fd, "E", 1);
} while(bytes_written == -1 && (errno == EINTR || errno == EAGAIN));

bg_thread.join();

close(thread_notify.fd);

ROCP_TRACE << fmt::format("Background thread signalled\n");
}

node_fd_t get_node_fd(int gpu_node_id) const
{
Expand All @@ -716,48 +737,36 @@ struct poll_kfd_t
return args.anon_fd;
}
};
} // namespace
} // namespace kfd

// for all contexts
struct page_migration_config
struct config
{
bool should_exit() const { return m_should_exit.load(); }
void set_exit(bool val) { m_should_exit.store(val); }

uint64_t enabled_events = 0;
kfd::poll_kfd_t* kfd_handle = nullptr;

private:
std::atomic<bool> m_should_exit = false;
};
kfd::poll_kfd_t kfd_handle{};

page_migration_config&
get_config()
{
static auto& state = *common::static_object<page_migration_config>::construct();
return state;
}
static inline config* _config{nullptr};

kfd::poll_kfd_t::~poll_kfd_t()
{
ROCP_TRACE << fmt::format("Terminating poll_kfd\n");
if(!active) return;
config(const small_vector<size_t>& _event_ids)
: kfd_handle{_event_ids}
{}

// wake thread up
kfd::get_config().set_exit(true);
auto bytes_written{-1};
do
{
bytes_written = write(thread_notify.fd, "E", 1);
} while(bytes_written == -1 && (errno == EINTR || errno == EAGAIN));
public:
static void init(const small_vector<size_t>& event_ids) { _config = new config{event_ids}; }

if(bg_thread.joinable()) bg_thread.join();
ROCP_TRACE << fmt::format("Background thread terminated\n");
static void reset()
{
config* ptr = nullptr;
std::swap(ptr, _config);
delete ptr;
}

for(const auto& f : file_handles)
close(f.fd);
}
} // namespace
static void reset_on_fork() { _config = nullptr; }
};

namespace
{
void
poll_events(small_vector<pollfd> file_handles)
{
Expand All @@ -778,7 +787,7 @@ poll_events(small_vector<pollfd> file_handles)
"Handle = {}, events = {}, revents = {}\n", fd.fd, fd.events, fd.revents);
}

while(!kfd::get_config().should_exit())
while(true)
{
auto poll_ret = poll(file_handles.data(), file_handles.size(), -1);

Expand All @@ -787,6 +796,10 @@ poll_events(small_vector<pollfd> file_handles)

if((exitfd.revents & POLLIN) != 0)
{
for(const auto& f : file_handles)
{
close(f.fd);
}
ROCP_INFO << "Terminating background thread\n";
return;
}
Expand All @@ -809,7 +822,6 @@ poll_events(small_vector<pollfd> file_handles)
}
}
}
} // namespace kfd

template <size_t Idx, size_t... IdxTail>
const char*
Expand Down Expand Up @@ -851,19 +863,17 @@ to_bitmask(small_vector<size_t>& _id_list, std::index_sequence<Idx...>)
(_emplace(_id_list, page_migration_info<Idx>::operation), ...);
}

namespace
{
rocprofiler_status_t
init(const small_vector<size_t>& event_ids)
template <size_t... Inxs>
rocprofiler_status_t init(std::index_sequence<Inxs...>)
{
static const small_vector<size_t> event_ids{Inxs...};
// Check if version is more than 1.11
auto ver = kfd::get_version();
if(ver.major_version * 1000 + ver.minor_version > 1011)
{
if(!context::get_registered_contexts(context_filter).empty())
{
if(!kfd::get_config().kfd_handle)
kfd::get_config().kfd_handle = new kfd::poll_kfd_t{event_ids};
config::init(event_ids);
}
return ROCPROFILER_STATUS_SUCCESS;
}
Expand All @@ -879,31 +889,28 @@ init(const small_vector<size_t>& event_ids)
}
} // namespace

} // namespace page_migration
} // namespace rocprofiler

namespace rocprofiler::page_migration
{
rocprofiler_status_t
init()
{
// Testing page migration
return init({
ROCPROFILER_PAGE_MIGRATION_PAGE_MIGRATE_START,
ROCPROFILER_PAGE_MIGRATION_PAGE_MIGRATE_END,
ROCPROFILER_PAGE_MIGRATION_PAGE_FAULT_START,
ROCPROFILER_PAGE_MIGRATION_PAGE_FAULT_END,
ROCPROFILER_PAGE_MIGRATION_QUEUE_EVICTION,
ROCPROFILER_PAGE_MIGRATION_QUEUE_RESTORE,
ROCPROFILER_PAGE_MIGRATION_UNMAP_FROM_GPU,
ROCPROFILER_PAGE_MIGRATION_DROPPED_EVENT,
pthread_atfork(nullptr, nullptr, []() {
// null out child's copy on fork and reinitialize
// otherwise all children wait on the same thread to join
config::reset_on_fork();
init(std::make_index_sequence<ROCPROFILER_PAGE_MIGRATION_LAST>{});
});

return init(std::make_index_sequence<ROCPROFILER_PAGE_MIGRATION_LAST>{});
}

void
finalize()
{
if(kfd::get_config().kfd_handle)
{
kfd::poll_kfd_t* _handle = nullptr;
std::swap(kfd::get_config().kfd_handle, _handle);
delete _handle;
}
config::reset();
}

const char*
Expand All @@ -920,5 +927,4 @@ get_ids()
get_ids(_data, std::make_index_sequence<ROCPROFILER_PAGE_MIGRATION_LAST>{});
return _data;
}
} // namespace page_migration
} // namespace rocprofiler
} // namespace rocprofiler::page_migration
10 changes: 7 additions & 3 deletions tests/page-migration/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ set(page-migration-env
set_tests_properties(
test-page-migration-execute
PROPERTIES TIMEOUT
45
60
LABELS
"integration-tests"
ENVIRONMENT
Expand All @@ -38,7 +38,9 @@ set_tests_properties(
SKIP_REGULAR_EXPRESSION
"KFD does not support SVM event reporting"
WORKING_DIRECTORY
${CMAKE_CURRENT_BINARY_DIR})
${CMAKE_CURRENT_BINARY_DIR}
ATTACHED_FILES
${CMAKE_CURRENT_BINARY_DIR}/page-migration-test.json)

# copy to binary directory
rocprofiler_configure_pytest_files(COPY validate.py conftest.py CONFIG pytest.ini)
Expand All @@ -60,4 +62,6 @@ set_tests_properties(
SKIP_REGULAR_EXPRESSION
"KFD does not support SVM event reporting"
WORKING_DIRECTORY
${CMAKE_CURRENT_BINARY_DIR})
${CMAKE_CURRENT_BINARY_DIR}
ATTACHED_FILES
${CMAKE_CURRENT_BINARY_DIR}/page-migration-test.json)
19 changes: 13 additions & 6 deletions tests/page-migration/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,16 +243,23 @@ def _sort_dict(inp):
api_corr_ids = _sort_dict(api_corr_ids)
async_corr_ids = _sort_dict(async_corr_ids)
retired_corr_ids = _sort_dict(retired_corr_ids)
missing_corr_ids = {}

for cid, itr in async_corr_ids.items():
assert cid in retired_corr_ids.keys()
ts = retired_corr_ids[cid]["timestamp"]
assert (ts - itr["end_timestamp"]) > 0, f"correlation-id: {cid}, data: {itr}"
if cid not in retired_corr_ids.keys():
missing_corr_ids[cid] = itr
else:
ts = retired_corr_ids[cid]["timestamp"]
assert (ts - itr["end_timestamp"]) > 0, f"correlation-id: {cid}, data: {itr}"

for cid, itr in api_corr_ids.items():
assert cid in retired_corr_ids.keys()
ts = retired_corr_ids[cid]["timestamp"]
assert (ts - itr["end_timestamp"]) > 0, f"correlation-id: {cid}, data: {itr}"
if cid not in retired_corr_ids.keys():
missing_corr_ids[cid] = itr
else:
ts = retired_corr_ids[cid]["timestamp"]
assert (ts - itr["end_timestamp"]) > 0, f"correlation-id: {cid}, data: {itr}"

assert len(missing_corr_ids) == 0, f"{missing_corr_ids}"

assert len(api_corr_ids.keys()) == (len(retired_corr_ids.keys()))

Expand Down

0 comments on commit e7d4562

Please sign in to comment.