Skip to content

Commit

Permalink
Fix stdout concurrency, add option --num-threads (#98)
Browse files Browse the repository at this point in the history
* fix concurrency issue on output to stdout

* add option --num-threads, set it to 1 in E2E tests to avoid concurrency issues with the std::stringstream buffer used to capture stdout and stderr

* add separate actions for docker build with BZ2 compressed file output, uncompressed file output, and stdout output, e2e tests for uncompressed file out, and output to stdout (which is always uncompressed)

* limit number of threads to 1 in remaining tests
  • Loading branch information
patrickbr authored Oct 18, 2024
1 parent 1271fb1 commit ddb1bbc
Show file tree
Hide file tree
Showing 18 changed files with 376 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: End-to-end test (docker build)
name: End-to-end test, BZ2 output (docker build)

on:
push:
Expand All @@ -13,15 +13,15 @@ concurrency:

jobs:
build:
name: End-to-end test (docker build)
name: End-to-end test, BZ2 output (docker build)
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
dockerfile: [ Dockerfile]

steps:
- name: Checkout the repository
- name: Checkout the repository
uses: actions/checkout@v3
with:
submodules: "recursive"
Expand Down
57 changes: 57 additions & 0 deletions .github/workflows/docker-build-stdout.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
name: End-to-end test, stdout output (docker build)

on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
merge_group:

concurrency:
group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}'
cancel-in-progress: true

jobs:
build:
name: End-to-end test, stdout output (docker build)
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
dockerfile: [ Dockerfile]

steps:
- name: Checkout the repository
uses: actions/checkout@v3
with:
submodules: "recursive"

- name: Build the docker image
run: |
set -v
docker build -f ${{matrix.dockerfile}} -t osm2rdf .
docker run --rm osm2rdf --help
- name: Build TTL for Malta and check its validity
run: |
set -v
mkdir osm-malta && cd $_
curl -L -o osm-malta.pbf https://download.geofabrik.de/europe/malta-latest.osm.pbf
ls -l osm-malta.pbf
docker run --rm -v $(pwd):/data osm2rdf /data/osm-malta.pbf > osm-malta.ttl
ls -l osm-malta.pbf osm-malta.ttl
docker run --rm -v $(pwd):/data stain/jena riot --validate /data/osm-malta.ttl
- name: Build QLever index and count the number of geometries
run: |
set -v
cd osm-malta
docker run -u $(id -u):$(id -g) -v $(pwd):/data -w /data --entrypoint bash adfreiburg/qlever -c "cat osm-malta.ttl | IndexBuilderMain -F ttl -f - -i osm-malta"
docker run -d -p 7000:7000 -v $(pwd):/data -w /data --entrypoint bash --name qlever adfreiburg/qlever -c "ServerMain -i /data/osm-malta -p 7000"
sleep 5
docker logs qlever
RESULT_JSON=$(curl http://localhost:7000 --data-urlencode "query=PREFIX geo: <http://www.opengis.net/ont/geosparql#> SELECT (COUNT(?geometry) AS ?count) WHERE { ?osm_id geo:hasGeometry ?geometry }")
echo "${RESULT_JSON}"
NUM_GEOMS=$(echo "${RESULT_JSON}" | jq --exit-status --raw-output .results.bindings[0].count.value)
echo ${NUM_GEOMS} | numfmt --grouping
test ${NUM_GEOMS} -gt 100000
57 changes: 57 additions & 0 deletions .github/workflows/docker-build-uncompressed.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
name: End-to-end test, uncompressed output (docker build)

on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
merge_group:

concurrency:
group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}'
cancel-in-progress: true

jobs:
build:
name: End-to-end test, uncompressed output (docker build)
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
dockerfile: [ Dockerfile]

steps:
- name: Checkout the repository
uses: actions/checkout@v3
with:
submodules: "recursive"

- name: Build the docker image
run: |
set -v
docker build -f ${{matrix.dockerfile}} -t osm2rdf .
docker run --rm osm2rdf --help
- name: Build TTL for Malta and check its validity
run: |
set -v
mkdir osm-malta && cd $_
curl -L -o osm-malta.pbf https://download.geofabrik.de/europe/malta-latest.osm.pbf
ls -l osm-malta.pbf
docker run --rm -v $(pwd):/data osm2rdf /data/osm-malta.pbf --output-no-compress -o /data/osm-malta.ttl
ls -l osm-malta.pbf osm-malta.ttl
docker run --rm -v $(pwd):/data stain/jena riot --validate /data/osm-malta.ttl
- name: Build QLever index and count the number of geometries
run: |
set -v
cd osm-malta
docker run -u $(id -u):$(id -g) -v $(pwd):/data -w /data --entrypoint bash adfreiburg/qlever -c "cat osm-malta.ttl | IndexBuilderMain -F ttl -f - -i osm-malta"
docker run -d -p 7000:7000 -v $(pwd):/data -w /data --entrypoint bash --name qlever adfreiburg/qlever -c "ServerMain -i /data/osm-malta -p 7000"
sleep 5
docker logs qlever
RESULT_JSON=$(curl http://localhost:7000 --data-urlencode "query=PREFIX geo: <http://www.opengis.net/ont/geosparql#> SELECT (COUNT(?geometry) AS ?count) WHERE { ?osm_id geo:hasGeometry ?geometry }")
echo "${RESULT_JSON}"
NUM_GEOMS=$(echo "${RESULT_JSON}" | jq --exit-status --raw-output .results.bindings[0].count.value)
echo ${NUM_GEOMS} | numfmt --grouping
test ${NUM_GEOMS} -gt 100000
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ add_compile_options(-DGTEST_HAS_TR1_TUPLE=0 -DGTEST_USE_OWN_TR1_TUPLE=0)

add_compile_options(-march=native)

set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fsanitize=address,undefined")

# Enable fast-math
add_compile_options(-ffast-math)

Expand Down
8 changes: 8 additions & 0 deletions apps/osm2rdf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
#include "osm2rdf/util/Ram.h"
#include "osm2rdf/util/Time.h"

#if defined(_OPENMP)
#include "omp.h"
#endif

// ____________________________________________________________________________
template <typename T>
void run(const osm2rdf::config::Config& config) {
Expand Down Expand Up @@ -71,6 +75,10 @@ int main(int argc, char** argv) {
(osm2rdf::util::ram::GIGA * 1.0)
<< "G" << std::endl;

#if defined(_OPENMP)
omp_set_num_threads(config.numThreads);
#endif

try {
if (config.outputFormat == "qlever") {
run<osm2rdf::ttl::format::QLEVER>(config);
Expand Down
3 changes: 3 additions & 0 deletions include/osm2rdf/config/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <string>
#include <unordered_set>
#include <vector>
#include <thread>

#include "osm2rdf/config/Constants.h"
#include "osm2rdf/ttl/Format.h"
Expand Down Expand Up @@ -69,6 +70,8 @@ struct Config {
bool addWayNodeSpatialMetadata = false;
bool skipWikiLinks = false;

int numThreads = std::thread::hardware_concurrency();

// Default settings for data
std::unordered_set<std::string> semicolonTagKeys;

Expand Down
12 changes: 9 additions & 3 deletions include/osm2rdf/config/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,15 @@ const static inline std::string SEMICOLON_TAG_KEYS_OPTION_HELP = "";
const static inline std::string AUX_GEO_FILES_INFO =
"Auxiliary geo files for computing spatial relations";
const static inline std::string AUX_GEO_FILES_OPTION_SHORT = "";
const static inline std::string AUX_GEO_FILES_OPTION_LONG =
"aux-geo-files";
const static inline std::string AUX_GEO_FILES_OPTION_HELP = "";
const static inline std::string AUX_GEO_FILES_OPTION_LONG = "aux-geo-files";
const static inline std::string AUX_GEO_FILES_OPTION_HELP =
"Auxiliary geo files for computing spatial relations";

const static inline std::string NUM_THREADS_INFO = "Number of threads to use";
const static inline std::string NUM_THREADS_OPTION_SHORT = "";
const static inline std::string NUM_THREADS_OPTION_LONG = "num-threads";
const static inline std::string NUM_THREADS_OPTION_HELP =
"Number of threads to use";

const static inline std::string WKT_PRECISION_INFO =
"Dumping WKT with precision: ";
Expand Down
10 changes: 10 additions & 0 deletions src/config/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ std::string osm2rdf::config::Config::getInfo(std::string_view prefix) const {
}
}
oss << "\n" << prefix << osm2rdf::config::constants::SECTION_MISCELLANEOUS;
oss << "\n"
<< prefix << "Num Threads: " << numThreads;

if (!storeLocationsOnDisk.empty()) {
oss << "\n"
Expand Down Expand Up @@ -294,6 +296,12 @@ void osm2rdf::config::Config::fromArgs(int argc, char** argv) {
osm2rdf::config::constants::AUX_GEO_FILES_OPTION_LONG,
osm2rdf::config::constants::AUX_GEO_FILES_OPTION_HELP);

auto numThreadsOp =
parser.add<popl::Value<int>, popl::Attribute::advanced>(
osm2rdf::config::constants::NUM_THREADS_OPTION_SHORT,
osm2rdf::config::constants::NUM_THREADS_OPTION_LONG,
osm2rdf::config::constants::NUM_THREADS_OPTION_HELP, numThreads);

auto semicolonTagKeysOp =
parser.add<popl::Value<std::string>, popl::Attribute::advanced>(
osm2rdf::config::constants::SEMICOLON_TAG_KEYS_OPTION_SHORT,
Expand Down Expand Up @@ -458,6 +466,8 @@ void osm2rdf::config::Config::fromArgs(int argc, char** argv) {
}
}

if (numThreadsOp->is_set()) numThreads = numThreadsOp->value();

writeRDFStatistics = writeRDFStatisticsOp->is_set();

// Output
Expand Down
8 changes: 4 additions & 4 deletions src/osm/GeometryHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ GeometryHandler<W>::GeometryHandler(const osm2rdf::config::Config& config,
: _config(config),
_writer(writer),
_sweeper(
{static_cast<size_t>(omp_get_max_threads()),
static_cast<size_t>(omp_get_max_threads()),
{static_cast<size_t>(config.numThreads),
static_cast<size_t>(config.numThreads),
"",
osm2rdf::ttl::constants::IRI__OPENGIS_INTERSECTS,
osm2rdf::ttl::constants::IRI__OPENGIS_CONTAINS,
Expand All @@ -85,7 +85,7 @@ GeometryHandler<W>::GeometryHandler(const osm2rdf::config::Config& config,
{},
[this](size_t progr) { this->progressCb(progr); }},
config.cache, ""),
_parseBatches(omp_get_max_threads()) {}
_parseBatches(config.numThreads) {}

// ___________________________________________________________________________
template <typename W>
Expand Down Expand Up @@ -275,7 +275,7 @@ void GeometryHandler<W>::calculateRelations() {
}

::util::JobQueue<ParseBatch> jobs(1000); // the WKT parse jobs
std::vector<std::thread> thrds(omp_get_max_threads()); // the parse workers
std::vector<std::thread> thrds(_config.numThreads); // the parse workers
for (size_t i = 0; i < thrds.size(); i++)
thrds[i] = std::thread(&processQueue, &jobs, i, &_sweeper);

Expand Down
25 changes: 17 additions & 8 deletions src/osm/OsmiumHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
#include "osmium/io/reader_with_progress_bar.hpp"
#include "osmium/util/memory.hpp"

#if defined(_OPENMP)
#include "omp.h"
#endif

// ____________________________________________________________________________
template <typename W>
osm2rdf::osm::OsmiumHandler<W>::OsmiumHandler(
Expand Down Expand Up @@ -62,9 +66,7 @@ void osm2rdf::osm::OsmiumHandler<W>::handle() {
osmium::osm_entity_bits::object};
{
while (auto buf = reader.read()) {
osmium::apply(buf, mp_manager,
_relationHandler,
countHandler);
osmium::apply(buf, mp_manager, _relationHandler, countHandler);
}
}
reader.close();
Expand All @@ -79,7 +81,15 @@ void osm2rdf::osm::OsmiumHandler<W>::handle() {
std::cerr << std::endl;
std::cerr << osm2rdf::util::currentTimeFormatted()
<< "OSM Pass 2 ... (dump)" << std::endl;
osmium::io::Reader reader{input_file, osmium::osm_entity_bits::object};
osmium::thread::Pool pool(std::max(_config.numThreads - 2, 1),
osmium::thread::Pool::default_queue_size);

#if defined(_OPENMP)
omp_set_num_threads(_config.numThreads);
#endif

osmium::io::Reader reader{input_file, osmium::osm_entity_bits::object,
pool};
osm2rdf::osm::LocationHandler* locationHandler =
osm2rdf::osm::LocationHandler::create(_config);
_relationHandler.setLocationHandler(locationHandler);
Expand All @@ -99,10 +109,10 @@ void osm2rdf::osm::OsmiumHandler<W>::handle() {
numTasks += countHandler.numRelations();
}
if (!_config.noFacts && !_config.noWayFacts) {
numTasks += countHandler.numWays();
numTasks += countHandler.numWays();
}
if (!_config.noGeometricRelations && !_config.noWayGeometricRelations) {
numTasks += countHandler.numWays();
numTasks += countHandler.numWays();
}

_progressBar = osm2rdf::util::ProgressBar{numTasks, true};
Expand All @@ -114,8 +124,7 @@ void osm2rdf::osm::OsmiumHandler<W>::handle() {
{
while (auto buf = reader.read()) {
osmium::apply(
buf, *locationHandler,
_relationHandler,
buf, *locationHandler, _relationHandler,
mp_manager.handler([&](osmium::memory::Buffer&& buffer) {
osmium::apply(buffer, *this);
}),
Expand Down
7 changes: 1 addition & 6 deletions src/ttl/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,7 @@ osm2rdf::ttl::Writer<T>::Writer(const osm2rdf::config::Config& config,
osm2rdf::ttl::constants::LITERAL__YES = generateLiteral("yes", "");

// Prepare statistic variables
#if defined(_OPENMP)
_numOuts = std::max(std::thread::hardware_concurrency(),
static_cast<unsigned int>(omp_get_max_threads()) + 1);
#else
_numOuts = std::thread::hardware_concurrency() + 1;
#endif
_numOuts = config.numThreads + 1;
_blankNodeCount = new uint64_t[_numOuts];
_headerLines = new uint64_t[_numOuts];
_lineCount = new uint64_t[_numOuts];
Expand Down
Loading

0 comments on commit ddb1bbc

Please sign in to comment.