Skip to content

Commit

Permalink
The recent changes in worker pool affected worker verticles with a re…
Browse files Browse the repository at this point in the history
…gression: the named verticle worker pool is closed before undeploying the verticle which leads to have the worker verticle undeployment fail when the worker pool is used only by this verticle.

Close the worker pool after the verticle has been deployed instead of before.
  • Loading branch information
vietj committed Sep 21, 2023
1 parent 28fe77b commit 8985ba5
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 6 deletions.
6 changes: 3 additions & 3 deletions src/main/java/io/vertx/core/impl/DeploymentManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,6 @@ public synchronized Future<Void> doUndeploy(ContextInternal undeployingContext)
status = ST_UNDEPLOYING;
return doUndeployChildren(undeployingContext).compose(v -> doUndeploy(undeployingContext));
} else {
if (workerPool != null) {
workerPool.close();
}
status = ST_UNDEPLOYED;
List<Future<?>> undeployFutures = new ArrayList<>();
if (parent != null) {
Expand Down Expand Up @@ -367,6 +364,9 @@ public synchronized Future<Void> doUndeploy(ContextInternal undeployingContext)
Promise<Void> resolvingPromise = undeployingContext.promise();
Future.all(undeployFutures).<Void>mapEmpty().onComplete(resolvingPromise);
Future<Void> fut = resolvingPromise.future();
if (workerPool != null) {
fut = fut.andThen(ar -> workerPool.close());
}
Handler<Void> handler = undeployHandler;
if (handler != null) {
undeployHandler = null;
Expand Down
30 changes: 27 additions & 3 deletions src/test/java/io/vertx/core/NamedWorkerPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.concurrent.TimeUnit.*;
Expand Down Expand Up @@ -298,12 +299,12 @@ public void start() throws Exception {
}

@Test
public void testDeployUsingNamedPool() throws Exception {
public void testDeployUsingNamedPool() {
AtomicReference<Thread> thread = new AtomicReference<>();
String poolName = "vert.x-" + TestUtils.randomAlphaString(10);
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() throws Exception {
public void start() {
vertx.executeBlocking(fut -> {
thread.set(Thread.currentThread());
assertTrue(Context.isOnVertxThread());
Expand All @@ -312,13 +313,36 @@ public void start() throws Exception {
assertTrue(Thread.currentThread().getName().startsWith(poolName + "-"));
fut.complete();
}, onSuccess(v -> {
vertx.undeploy(context.deploymentID());
vertx.undeploy(context.deploymentID()).onComplete(ar -> {
System.out.println("UNDEPLOYED " + ar.succeeded());
});
}));
}
}, new DeploymentOptions().setWorkerPoolName(poolName), onSuccess(v -> {}));
assertWaitUntil(() -> thread.get() != null && thread.get().getState() == Thread.State.TERMINATED);
}

@Test
public void testNamedWorkerPoolShouldBeClosedAfterVerticleIsUndeployed() {
AtomicReference<String> threadName = new AtomicReference<>();
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() {
}
@Override
public void stop() {
threadName.set(Thread.currentThread().getName());
}
}, new DeploymentOptions().setWorker(true).setWorkerPoolName("test-worker"), onSuccess(id -> {
vertx.undeploy(id).onComplete(onSuccess(v -> {
assertNotNull(threadName.get());
assertTrue(threadName.get().startsWith("test-worker"));
testComplete();
}));
}));
await();
}

@Test
public void testDeployUsingNamedWorkerDoesNotCreateExtraEventLoop() {
int instances = getOptions().getEventLoopPoolSize();
Expand Down

0 comments on commit 8985ba5

Please sign in to comment.