Skip to content

Commit

Permalink
Add framework to let raster tiles complete work before parent tile
Browse files Browse the repository at this point in the history
  • Loading branch information
csciguy8 committed Jan 12, 2024
1 parent ead87a8 commit 74df410
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ struct TileLoadWork {

ResponseDataMap responsesByUrl;

std::vector<TileLoadWork> childWork;

bool operator<(const TileLoadWork& rhs) const noexcept {
if (this->group == rhs.group)
return this->priority < rhs.priority;
Expand Down Expand Up @@ -62,6 +64,8 @@ class TileWorkManager {
std::vector<TileLoadWork>& outCompleted,
std::vector<TileLoadWork>& outFailed);

void SignalWorkComplete(const TileLoadWork& work);

size_t GetPendingRequestsCount();
size_t GetTotalPendingCount();

Expand All @@ -85,6 +89,10 @@ class TileWorkManager {
bool isRequestAlreadyInFlight(const TileLoadWork& newRequest);
bool isWorkAlreadyProcessing(const TileLoadWork& newProcessing);

void eraseMatchingChildWork(
const TileLoadWork& work,
std::vector<TileLoadWork>& childWork);

// Thread safe members
std::mutex _requestsLock;
bool _exitSignaled = false;
Expand Down
101 changes: 83 additions & 18 deletions Cesium3DTilesSelection/src/TileWorkManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,8 @@ void TileWorkManager::QueueBatch(
std::lock_guard<std::mutex> lock(_requestsLock);

for (TileLoadWork* element : requestWork) {
if (isRequestAlreadyQueued(*element))
continue;
if (isRequestAlreadyInFlight(*element))
continue;
assert(!isRequestAlreadyQueued(*element));
assert(!isRequestAlreadyInFlight(*element));
_requestQueue.push_back(std::move(*element));
}

Expand All @@ -112,15 +110,70 @@ void TileWorkManager::QueueBatch(
void TileWorkManager::QueueSingleRequest(const TileLoadWork& requestWork) {
{
std::lock_guard<std::mutex> lock(_requestsLock);

// TODO - This needs to be an assertion
if (!isRequestAlreadyQueued(requestWork) &&
!isRequestAlreadyInFlight(requestWork)) {
!isRequestAlreadyInFlight(requestWork))
_requestQueue.push_back(std::move(requestWork));
}
}

transitionQueuedWork();
}

void TileWorkManager::eraseMatchingChildWork(
const TileLoadWork& work,
std::vector<TileLoadWork>& childWork) {
std::vector<TileLoadWork>::iterator childIt;
for (childIt = childWork.begin(); childIt != childWork.end(); ++childIt) {
bool baseWorkEqual = childIt->childWork.size() == work.childWork.size() &&
childIt->group == work.group &&
childIt->priority == work.priority;
bool workHasTileProcessing =
std::holds_alternative<TileProcessingData>(work.processingData);
bool childHasTileProcessing =
std::holds_alternative<TileProcessingData>(childIt->processingData);

if (!baseWorkEqual || workHasTileProcessing != childHasTileProcessing)
continue;

if (workHasTileProcessing) {
TileProcessingData workTileProcessing =
std::get<TileProcessingData>(work.processingData);
TileProcessingData childTileProcessing =
std::get<TileProcessingData>(childIt->processingData);
if (workTileProcessing.pTile != childTileProcessing.pTile)
continue;
} else {
RasterProcessingData workRasterProcessing =
std::get<RasterProcessingData>(work.processingData);
RasterProcessingData childRasterProcessing =
std::get<RasterProcessingData>(childIt->processingData);
if (workRasterProcessing.pRasterTile != childRasterProcessing.pRasterTile)
continue;
}

childWork.erase(childIt);
break;
}
}

void TileWorkManager::SignalWorkComplete(const TileLoadWork& work) {
std::lock_guard<std::mutex> lock(_requestsLock);

// Look for any work whose child matches this. And remove ourselves
for (TileLoadWork& existingRequest : _requestQueue)
eraseMatchingChildWork(work, existingRequest.childWork);

std::map<std::string, std::vector<TileLoadWork>>::iterator mapIt;
for (mapIt = _inFlightRequests.begin(); mapIt != _inFlightRequests.end();
++mapIt)
for (TileLoadWork& inFlightWork : mapIt->second)
eraseMatchingChildWork(work, inFlightWork.childWork);

for (TileLoadWork& doneRequest : _processingQueue)
eraseMatchingChildWork(work, doneRequest.childWork);
}

void TileWorkManager::onRequestFinished(
uint16_t responseStatusCode,
gsl::span<const std::byte> responseBytes,
Expand Down Expand Up @@ -259,22 +312,34 @@ void TileWorkManager::TakeProcessingWork(
if (processingCount == 0)
return;

// TODO - This list should be a map so it is always sorted
// Reverse sort so highest priority is at back
std::sort(_processingQueue.rbegin(), _processingQueue.rend());

size_t numberToTake = std::min(processingCount, maxCount);

// If not taking everything, sort so more important work goes first
if (numberToTake < processingCount)
std::sort(_processingQueue.begin(), _processingQueue.end());
// Start from the back
std::vector<TileLoadWork>::iterator it = _processingQueue.end();
while (1) {
--it;

TileLoadWork& work = *it;
if (!work.childWork.empty()) {
// Can't take this work yet
// Child work has to register completion first
} else {
// Move this work to output. Erase from queue
std::vector<TileLoadWork>::iterator eraseIt = it;
outCompleted.push_back(std::move(*eraseIt));
_processingQueue.erase(eraseIt);
}

// Move work to output
for (auto workIt = _processingQueue.begin();
workIt != _processingQueue.begin() + numberToTake;
++workIt)
outCompleted.push_back(std::move(*workIt));
if (outCompleted.size() >= numberToTake)
break;

// Remove these entries from the source
_processingQueue.erase(
_processingQueue.begin(),
_processingQueue.begin() + numberToTake);
if (it == _processingQueue.begin())
break;
}
}

void TileWorkManager::transitionQueuedWork() {
Expand Down
71 changes: 48 additions & 23 deletions Cesium3DTilesSelection/src/TilesetContentManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,10 @@ void createQuadtreeSubdividedChildren(

std::vector<CesiumGeospatial::Projection> mapOverlaysToTile(
Tile& tile,
size_t depthIndex,
RasterOverlayCollection& overlays,
double maximumScreenSpaceError,
const std::vector<CesiumAsync::IAssetAccessor::THeader>& defaultHeaders,
std::vector<TilesetContentManager::ParsedTileWork>& outWork) {
std::vector<TilesetContentManager::RasterWorkChain>& outWork) {
// when tile fails temporarily, it may still have mapped raster tiles, so
// clear it here
tile.getMappedRasterTiles().clear();
Expand Down Expand Up @@ -293,11 +292,11 @@ std::vector<CesiumGeospatial::Projection> mapOverlaysToTile(
pMapped->getLoadThrottledWork(requestData, rasterCallback);

if (!requestData.url.empty() || rasterCallback != nullptr) {
TilesetContentManager::ParsedTileWork newWork = {
depthIndex,
TilesetContentManager::RasterWorkChain newWorkChain = {
pMapped,
requestData,
RasterProcessingData{pMapped, rasterCallback}};
outWork.push_back(newWork);
rasterCallback};
outWork.push_back(newWorkChain);
}
}
}
Expand Down Expand Up @@ -900,13 +899,30 @@ void TilesetContentManager::discoverLoadWork(
TilesetContentManager::ParsedTileWork& work = parsedTileWork[workIndex];

double priorityBias = double(maxDepth - work.depthIndex);
double resultPriority = loadRequest.priority + priorityBias;

TileLoadWork newWorkUnit = {
work.requestData,
work.processingData,
work.tileWorkChain.requestData,
TileProcessingData{
work.tileWorkChain.pTile,
work.tileWorkChain.tileCallback},
work.projections,
loadRequest.group,
loadRequest.priority + priorityBias};
resultPriority};

for (auto rasterWorkChain : work.rasterWorkChains) {
TileLoadWork rasterWorkUnit = {
rasterWorkChain.requestData,
RasterProcessingData{
rasterWorkChain.pRasterTile,
rasterWorkChain.rasterCallback},
work.projections,
loadRequest.group,
resultPriority};

// Embed child work in parent
newWorkUnit.childWork.push_back(rasterWorkUnit);
}

outLoadWork.push_back(newWorkUnit);
}
Expand All @@ -921,11 +937,20 @@ void TilesetContentManager::addWorkToManager(

_tileWorkManager.SetMaxSimultaneousRequests(maxSimultaneousRequests);

// Expand any child work
std::vector<TileLoadWork> flattenedWork;
for (TileLoadWork& work : loadWork) {
for (TileLoadWork& child : work.childWork) {
flattenedWork.push_back(child);
}
flattenedWork.push_back(work);
}

// Request work will always go to that queue first
// Work with only processing can bypass it
std::vector<TileLoadWork*> requestWork;
std::vector<TileLoadWork*> processingWork;
for (TileLoadWork& work : loadWork) {
for (TileLoadWork& work : flattenedWork) {
if (work.requestData.url.empty())
processingWork.push_back(&work);
else
Expand Down Expand Up @@ -1103,6 +1128,8 @@ void TilesetContentManager::dispatchProcessingWork(
_work.requestData.headers = newRequestData.headers;

_this->_tileWorkManager.QueueSingleRequest(_work);
} else {
_this->_tileWorkManager.SignalWorkComplete(_work);
}

_this->notifyRasterDoneLoading();
Expand Down Expand Up @@ -1171,10 +1198,11 @@ void TilesetContentManager::parseTileWork(
rasterTile.getLoadThrottledWork(requestData, rasterCallback);

if (!requestData.url.empty() || rasterCallback != nullptr) {
TilesetContentManager::ParsedTileWork newWork = {
depthIndex,
requestData,
RasterProcessingData{&rasterTile, rasterCallback}};
// TODO - This needs a different solution for continuation
// We can't pick up with an empty tile work chain
ParsedTileWork newWork = {depthIndex};
newWork.rasterWorkChains.push_back(
RasterWorkChain{&rasterTile, requestData, rasterCallback});
outWork.push_back(newWork);
}
}
Expand Down Expand Up @@ -1234,20 +1262,17 @@ void TilesetContentManager::parseTileWork(

pLoader->getLoadWork(pTile, requestData, tileCallback);

// map raster overlay to tile
std::vector<CesiumGeospatial::Projection> projections = mapOverlaysToTile(
*pTile,
ParsedTileWork newWork = {
depthIndex,
TileWorkChain{pTile, requestData, tileCallback}};

newWork.projections = mapOverlaysToTile(
*pTile,
this->_overlayCollection,
maximumScreenSpaceError,
this->_requestHeaders,
outWork);
newWork.rasterWorkChains);

ParsedTileWork newWork = {
depthIndex,
requestData,
TileProcessingData{pTile, tileCallback},
projections};
outWork.push_back(newWork);
}

Expand Down
16 changes: 14 additions & 2 deletions Cesium3DTilesSelection/src/TilesetContentManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,24 @@ class TilesetContentManager

~TilesetContentManager() noexcept;

struct TileWorkChain {
Tile* pTile;
RequestData requestData;
TileProcessingCallback tileCallback;
};

struct RasterWorkChain {
RasterMappedTo3DTile* pRasterTile;
RequestData requestData;
RasterProcessingCallback rasterCallback;
};

struct ParsedTileWork {
size_t depthIndex;

RequestData requestData;
TileWorkChain tileWorkChain;

ProcessingData processingData;
std::vector<RasterWorkChain> rasterWorkChains;

std::vector<CesiumGeospatial::Projection> projections;

Expand Down

0 comments on commit 74df410

Please sign in to comment.