Skip to content

Commit

Permalink
Add logic in master service to optimize performance and retain detail…
Browse files Browse the repository at this point in the history
…ed logging for critical cluster operations.

Signed-off-by: Sumit Bansal <[email protected]>
  • Loading branch information
sumitasr committed Oct 25, 2024
1 parent ebcf5e3 commit 15d1ff3
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 47 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add _list/shards API as paginated alternate to _cat/shards ([#14641](https://github.com/opensearch-project/OpenSearch/pull/14641))
- Latency and Memory allocation improvements to Multi Term Aggregation queries ([#14993](https://github.com/opensearch-project/OpenSearch/pull/14993))
- Flat object field use IndexOrDocValuesQuery to optimize query ([#14383](https://github.com/opensearch-project/OpenSearch/issues/14383))
- Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))

### Dependencies
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,33 +299,37 @@ public static boolean assertNotMasterUpdateThread(String reason) {
}

private void runTasks(TaskInputs taskInputs) {
final String longSummary = logger.isTraceEnabled() ? taskInputs.taskSummaryGenerator.apply(true) : "";
final String shortSummary = taskInputs.taskSummaryGenerator.apply(false);
final String summary;
if (logger.isTraceEnabled()) {
summary = taskInputs.taskSummaryGenerator.apply(true);
} else {
summary = taskInputs.taskSummaryGenerator.apply(false);
}

if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster-manager service not started", shortSummary);
logger.debug("processing [{}]: ignoring, cluster-manager service not started", summary);
return;
}

if (logger.isTraceEnabled()) {
logger.trace("executing cluster state update for [{}]", longSummary);
logger.trace("executing cluster state update for [{}]", summary);
} else {
logger.debug("executing cluster state update for [{}]", shortSummary);
logger.debug("executing cluster state update for [{}]", summary);
}

final ClusterState previousClusterState = state();

if (!previousClusterState.nodes().isLocalNodeElectedClusterManager() && taskInputs.runOnlyWhenClusterManager()) {
logger.debug("failing [{}]: local node is no longer cluster-manager", shortSummary);
logger.debug("failing [{}]: local node is no longer cluster-manager", summary);
taskInputs.onNoLongerClusterManager();
return;
}

final long computationStartTime = threadPool.preciseRelativeTimeInNanos();
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, shortSummary);
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, summary);
taskOutputs.notifyFailedTasks();
final TimeValue computationTime = getTimeSince(computationStartTime);
logExecutionTime(computationTime, "compute cluster state update", shortSummary);
logExecutionTime(computationTime, "compute cluster state update", summary);

clusterManagerMetrics.recordLatency(
clusterManagerMetrics.clusterStateComputeHistogram,
Expand All @@ -337,25 +341,25 @@ private void runTasks(TaskInputs taskInputs) {
final long notificationStartTime = threadPool.preciseRelativeTimeInNanos();
taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();
final TimeValue executionTime = getTimeSince(notificationStartTime);
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", shortSummary);
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary);
} else {
final ClusterState newClusterState = taskOutputs.newClusterState;
if (logger.isTraceEnabled()) {
logger.trace("cluster state updated, source [{}]\n{}", longSummary, newClusterState);
logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState);
} else {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), shortSummary);
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary);
}
final long publicationStartTime = threadPool.preciseRelativeTimeInNanos();
try {
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(shortSummary, newClusterState, previousClusterState);
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
String nodesDeltaSummary = nodesDelta.shortSummary();
if (nodesDeltaSummary.length() > 0) {
logger.info(
"{}, term: {}, version: {}, delta: {}",
shortSummary,
summary,
newClusterState.term(),
newClusterState.version(),
nodesDeltaSummary
Expand All @@ -366,7 +370,7 @@ private void runTasks(TaskInputs taskInputs) {
logger.debug("publishing cluster state version [{}]", newClusterState.version());
publish(clusterChangedEvent, taskOutputs, publicationStartTime);
} catch (Exception e) {
handleException(shortSummary, publicationStartTime, newClusterState, e);
handleException(summary, publicationStartTime, newClusterState, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,25 +195,37 @@ void runIfNotProcessed(BatchedTask updateTask) {
if (toExecute.isEmpty() == false) {
Function<Boolean, String> taskSummaryGenerator = (longSummaryRequired) -> {
if (longSummaryRequired == null || !longSummaryRequired) {
return buildShortSummary(updateTask.batchingKey, toExecute.size());
final List<BatchedTask> sampleTasks = toExecute.stream()
.limit(Math.min(1000, toExecute.size()))
.collect(Collectors.toList());
return buildShortSummary(updateTask.batchingKey, toExecute.size(), getSummary(updateTask, sampleTasks));
}
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
for (final BatchedTask task : toExecute) {
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
}
return processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
return getSummary(updateTask, toExecute);
};
taskBatcherListener.onBeginProcessing(toExecute);
run(updateTask.batchingKey, toExecute, taskSummaryGenerator);
}
}
}

private String buildShortSummary(final Object batchingKey, final int taskCount) {
return "Tasks batched with key: " + batchingKey.toString().split("\\$")[0] + " and count: " + taskCount;
private String getSummary(final BatchedTask updateTask, final List<BatchedTask> toExecute) {
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
for (final BatchedTask task : toExecute) {
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
}
return processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
}

private String buildShortSummary(final Object batchingKey, final int taskCount, final String sampleTasks) {
return "Tasks batched with key: "
+ batchingKey.toString().split("\\$")[0]
+ ", count:"
+ taskCount
+ " and sample tasks: "
+ sampleTasks;
}

/**
Expand Down
Loading

0 comments on commit 15d1ff3

Please sign in to comment.