Skip to content

Commit

Permalink
Fixed Processing sometimes running processors on the main server thread
Browse files Browse the repository at this point in the history
  • Loading branch information
AuroraLS3 committed Jul 19, 2018
1 parent 38084be commit 31e04bb
Showing 1 changed file with 30 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,22 @@ public static void saveInstance(Object obj) {

public static void submitNonCritical(Runnable runnable) {
saveInstance(runnable);
CompletableFuture.supplyAsync(() -> runnable, getInstance().nonCriticalExecutor)
.thenAccept(Runnable::run)
.handle(Processing::exceptionHandler);
ExecutorService executor = getInstance().nonCriticalExecutor;
if (executor.isShutdown()) {
return;
}
CompletableFuture.supplyAsync(() -> {
runnable.run();
return true;
}, executor).handle(Processing::exceptionHandler);
}

public static void submitCritical(Runnable runnable) {
saveInstance(runnable);
CompletableFuture.supplyAsync(() -> runnable, getInstance().criticalExecutor)
.thenAccept(Runnable::run)
.handle(Processing::exceptionHandler);
CompletableFuture.supplyAsync(() -> {
runnable.run();
return true;
}, getInstance().criticalExecutor).handle(Processing::exceptionHandler);
}

public static void submitNonCritical(Runnable... runnables) {
Expand All @@ -72,15 +78,17 @@ public static <T> Future<T> submit(Callable<T> task) {

public static <T> Future<T> submitNonCritical(Callable<T> task) {
saveInstance(task);
return CompletableFuture.supplyAsync(() -> task, getInstance().nonCriticalExecutor)
.thenApply(tCallable -> {
try {
return tCallable.call();
} catch (Exception e) {
throw new IllegalStateException(e);
}
})
.handle(Processing::exceptionHandler);
ExecutorService executor = getInstance().nonCriticalExecutor;
if (executor.isShutdown()) {
return null;
}
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
} catch (Exception e) {
throw new IllegalStateException(e);
}
}, getInstance().nonCriticalExecutor).handle(Processing::exceptionHandler);
}

private static <T> T exceptionHandler(T t, Throwable throwable) {
Expand All @@ -92,15 +100,13 @@ private static <T> T exceptionHandler(T t, Throwable throwable) {

public static <T> Future<T> submitCritical(Callable<T> task) {
saveInstance(task);
return CompletableFuture.supplyAsync(() -> task, getInstance().criticalExecutor)
.thenApply(tCallable -> {
try {
return tCallable.call();
} catch (Exception e) {
throw new IllegalStateException(e);
}
})
.handle(Processing::exceptionHandler);
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
} catch (Exception e) {
throw new IllegalStateException(e);
}
}, getInstance().criticalExecutor).handle(Processing::exceptionHandler);
}

public static Processing getInstance() {
Expand Down

0 comments on commit 31e04bb

Please sign in to comment.