From 8985ba566e003af342bdb9466fc875acc6862b5d Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Thu, 21 Sep 2023 11:45:42 +0200 Subject: [PATCH] The recent changes in worker pool affected worker verticles with a regression: the named verticle worker pool is closed before undeploying the verticle which leads to have the worker verticle undeployment fail when the worker pool is used only by this verticle. Close the worker pool after the verticle has been deployed instead of before. --- .../io/vertx/core/impl/DeploymentManager.java | 6 ++-- .../io/vertx/core/NamedWorkerPoolTest.java | 30 +++++++++++++++++-- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/vertx/core/impl/DeploymentManager.java b/src/main/java/io/vertx/core/impl/DeploymentManager.java index 38303f8b4ba..6cb9fc0c74b 100644 --- a/src/main/java/io/vertx/core/impl/DeploymentManager.java +++ b/src/main/java/io/vertx/core/impl/DeploymentManager.java @@ -326,9 +326,6 @@ public synchronized Future doUndeploy(ContextInternal undeployingContext) status = ST_UNDEPLOYING; return doUndeployChildren(undeployingContext).compose(v -> doUndeploy(undeployingContext)); } else { - if (workerPool != null) { - workerPool.close(); - } status = ST_UNDEPLOYED; List> undeployFutures = new ArrayList<>(); if (parent != null) { @@ -367,6 +364,9 @@ public synchronized Future doUndeploy(ContextInternal undeployingContext) Promise resolvingPromise = undeployingContext.promise(); Future.all(undeployFutures).mapEmpty().onComplete(resolvingPromise); Future fut = resolvingPromise.future(); + if (workerPool != null) { + fut = fut.andThen(ar -> workerPool.close()); + } Handler handler = undeployHandler; if (handler != null) { undeployHandler = null; diff --git a/src/test/java/io/vertx/core/NamedWorkerPoolTest.java b/src/test/java/io/vertx/core/NamedWorkerPoolTest.java index e1b64b50c57..6d562f8b2bb 100644 --- a/src/test/java/io/vertx/core/NamedWorkerPoolTest.java +++ b/src/test/java/io/vertx/core/NamedWorkerPoolTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static java.util.concurrent.TimeUnit.*; @@ -298,12 +299,12 @@ public void start() throws Exception { } @Test - public void testDeployUsingNamedPool() throws Exception { + public void testDeployUsingNamedPool() { AtomicReference thread = new AtomicReference<>(); String poolName = "vert.x-" + TestUtils.randomAlphaString(10); vertx.deployVerticle(new AbstractVerticle() { @Override - public void start() throws Exception { + public void start() { vertx.executeBlocking(fut -> { thread.set(Thread.currentThread()); assertTrue(Context.isOnVertxThread()); @@ -312,13 +313,36 @@ public void start() throws Exception { assertTrue(Thread.currentThread().getName().startsWith(poolName + "-")); fut.complete(); }, onSuccess(v -> { - vertx.undeploy(context.deploymentID()); + vertx.undeploy(context.deploymentID()).onComplete(ar -> { + System.out.println("UNDEPLOYED " + ar.succeeded()); + }); })); } }, new DeploymentOptions().setWorkerPoolName(poolName), onSuccess(v -> {})); assertWaitUntil(() -> thread.get() != null && thread.get().getState() == Thread.State.TERMINATED); } + @Test + public void testNamedWorkerPoolShouldBeClosedAfterVerticleIsUndeployed() { + AtomicReference threadName = new AtomicReference<>(); + vertx.deployVerticle(new AbstractVerticle() { + @Override + public void start() { + } + @Override + public void stop() { + threadName.set(Thread.currentThread().getName()); + } + }, new DeploymentOptions().setWorker(true).setWorkerPoolName("test-worker"), onSuccess(id -> { + vertx.undeploy(id).onComplete(onSuccess(v -> { + assertNotNull(threadName.get()); + assertTrue(threadName.get().startsWith("test-worker")); + testComplete(); + })); + })); + await(); + } + @Test public void testDeployUsingNamedWorkerDoesNotCreateExtraEventLoop() { int instances = getOptions().getEventLoopPoolSize();