Skip to content

Commit

Permalink
eclipse-vertx#4851 ConcurrentHashSet→Utils.concurrentHashSet
Browse files Browse the repository at this point in the history
  • Loading branch information
magicprinc committed Sep 11, 2023
1 parent 280f4cf commit efc1f68
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 28 deletions.
2 changes: 2 additions & 0 deletions src/main/java/io/vertx/core/impl/ConcurrentHashSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import java.util.concurrent.ConcurrentHashMap;

/**
* @deprecated use {@link Utils#concurrentHashSet}
* @author <a href="http://tfox.org">Tim Fox</a>
*/
@Deprecated
public class ConcurrentHashSet<E> implements Set<E> {

private final Map<E, Object> map;
Expand Down
7 changes: 3 additions & 4 deletions src/main/java/io/vertx/core/impl/DeploymentManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
package io.vertx.core.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
Expand Down Expand Up @@ -75,9 +74,9 @@ public Future<String> deployVerticle(Callable<Verticle> verticleSupplier, Deploy

public Future<Void> undeployVerticle(String deploymentID) {
Deployment deployment = deployments.get(deploymentID);
Context currentContext = vertx.getOrCreateContext();
ContextInternal currentContext = vertx.getOrCreateContext();
if (deployment == null) {
return ((ContextInternal) currentContext).failedFuture(new IllegalStateException("Unknown deployment"));
return currentContext.failedFuture(new IllegalStateException("Unknown deployment"));
} else {
return deployment.doUndeploy(vertx.getOrCreateContext());
}
Expand Down Expand Up @@ -255,7 +254,7 @@ private class DeploymentImpl implements Deployment {
private final JsonObject conf;
private final String verticleIdentifier;
private final List<VerticleHolder> verticles = new CopyOnWriteArrayList<>();
private final Set<Deployment> children = new ConcurrentHashSet<>();
private final Set<Deployment> children = Utils.concurrentHashSet();
private final DeploymentOptions options;
private Handler<Void> undeployHandler;
private int status = ST_DEPLOYED;
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/io/vertx/core/impl/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@

import io.netty.util.internal.PlatformDependent;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* Simple generic utility methods and constants
*
Expand Down Expand Up @@ -45,4 +49,11 @@ public static boolean isWindows() {
return isWindows;
}

public static <E> Set<E> concurrentHashSet (){
return Collections.newSetFromMap(new ConcurrentHashMap<>());
}

public static <E> Set<E> concurrentHashSet (int initialSize){
return Collections.newSetFromMap(new ConcurrentHashMap<>(initialSize));
}
}
11 changes: 4 additions & 7 deletions src/test/java/io/vertx/core/ComplexHATest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@

package io.vertx.core;

import io.vertx.core.DeploymentOptions;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.impl.Deployment;
import io.vertx.core.impl.Utils;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.json.JsonObject;
import io.vertx.core.spi.cluster.ClusterManager;
Expand All @@ -40,11 +37,11 @@
*/
public class ComplexHATest extends VertxTestBase {

protected ClusterManager getClusterManager() {
@Override protected ClusterManager getClusterManager() {
return new FakeClusterManager();
}

private Random random = new Random();
private final Random random = new Random();

protected final int maxVerticlesPerNode = 20;
protected Set<Deployment>[] deploymentSnapshots;
Expand Down Expand Up @@ -162,7 +159,7 @@ protected void takeDeploymentSnapshots() {
}

protected Set<Deployment> takeDeploymentSnapshot(int pos) {
Set<Deployment> snapshot = new ConcurrentHashSet<>();
Set<Deployment> snapshot = Utils.concurrentHashSet();
VertxInternal v = (VertxInternal)vertices[pos];
for (String depID: v.deploymentIDs()) {
snapshot.add(v.getDeployment(depID));
Expand Down
11 changes: 5 additions & 6 deletions src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
import io.vertx.core.*;
import io.vertx.core.eventbus.impl.EventBusInternal;
import io.vertx.core.eventbus.impl.MessageConsumerImpl;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.Utils;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.streams.ReadStream;
import io.vertx.test.core.TestUtils;
Expand All @@ -41,7 +41,7 @@ public class LocalEventBusTest extends EventBusTestBase {
private EventBusInternal eb;
private boolean running;

public void setUp() throws Exception {
@Override public void setUp() throws Exception {
super.setUp();
vertx.close();
vertx = Vertx.vertx();
Expand All @@ -52,7 +52,7 @@ public void setUp() throws Exception {
running = true;
}

protected void tearDown() throws Exception {
@Override protected void tearDown() throws Exception {
closeVertx();
super.tearDown();
}
Expand Down Expand Up @@ -712,7 +712,7 @@ public void start() {

@Test
public void testContextsSend() throws Exception {
Set<ContextInternal> contexts = new ConcurrentHashSet<>();
Set<ContextInternal> contexts = Utils.concurrentHashSet();
CountDownLatch latch = new CountDownLatch(2);
vertx.eventBus().consumer(ADDRESS1).handler(msg -> {
msg.reply("bar");
Expand All @@ -730,7 +730,7 @@ public void testContextsSend() throws Exception {

@Test
public void testContextsPublish() throws Exception {
Set<ContextInternal> contexts = new ConcurrentHashSet<>();
Set<ContextInternal> contexts = Utils.concurrentHashSet();
AtomicInteger cnt = new AtomicInteger();
int numHandlers = 10;
for (int i = 0; i < numHandlers; i++) {
Expand Down Expand Up @@ -1449,4 +1449,3 @@ public void testEarlyTimeoutOnHandlerUnregistration() {
await();
}
}

6 changes: 3 additions & 3 deletions src/test/java/io/vertx/core/http/Http1xTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1520,7 +1520,7 @@ private void testKeepAlive(boolean keepAlive, int poolSize, int numServers, int
// Start the servers
HttpServer[] servers = new HttpServer[numServers];
CountDownLatch startServerLatch = new CountDownLatch(numServers);
Set<HttpServer> connectedServers = new ConcurrentHashSet<>();
Set<HttpServer> connectedServers = Utils.concurrentHashSet();
for (int i = 0; i < numServers; i++) {
HttpServer server = vertx.createHttpServer(new HttpServerOptions().setHost(DEFAULT_HTTP_HOST).setPort(DEFAULT_HTTP_PORT));
server.requestHandler(req -> {
Expand Down Expand Up @@ -1736,7 +1736,7 @@ public void testSharedServersRoundRobin() throws Exception {
Map<HttpServer, Integer> requestCount = new ConcurrentHashMap<>();

CountDownLatch latchConns = new CountDownLatch(numRequests);
Set<Thread> threads = new ConcurrentHashSet<>();
Set<Thread> threads = Utils.concurrentHashSet();
Future<String> listenLatch = vertx.deployVerticle(() -> new AbstractVerticle() {
Thread thread;
@Override
Expand Down Expand Up @@ -2199,7 +2199,7 @@ public void testContexts() throws Exception {
});
expectedThreads.add(th.get());
}
Set<Thread> threads = new ConcurrentHashSet<>();
Set<Thread> threads = Utils.concurrentHashSet();
for (int i = 0; i < numReqs; i++) {
Context requestCtx = contexts.get(i);
CompletableFuture<Long> cf = new CompletableFuture<>();
Expand Down
5 changes: 3 additions & 2 deletions src/test/java/io/vertx/core/http/WebSocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.vertx.core.http.impl.WebSocketInternal;
import io.vertx.core.http.impl.ws.WebSocketFrameImpl;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.impl.Utils;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
Expand Down Expand Up @@ -519,7 +520,7 @@ public void testSharedServersRoundRobin() throws Exception {
int numConnections = numServers * 100;

List<HttpServer> servers = new ArrayList<>();
Set<HttpServer> connectedServers = new ConcurrentHashSet<>();
Set<HttpServer> connectedServers = Utils.concurrentHashSet();
Map<HttpServer, Integer> connectCount = new ConcurrentHashMap<>();

CountDownLatch latchConns = new CountDownLatch(numConnections);
Expand Down Expand Up @@ -3651,7 +3652,7 @@ private <T> void testFanout(T hello, T bye, Function<WebSocketBase, String> hand
String path = "/some/path";
int numConnections = 10;

Set<String> connections = new ConcurrentHashSet<>();
Set<String> connections = Utils.concurrentHashSet();
HttpServerOptions httpServerOptions = new HttpServerOptions()
.setPort(DEFAULT_HTTP_PORT)
.setRegisterWebSocketWriteHandlers(true);
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/io/vertx/core/net/NetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1975,8 +1975,8 @@ public void testSharedServersRoundRobin() throws Exception {
int numConnections = numServers * (domainSocket ? 10 : 20);

List<NetServer> servers = new ArrayList<>();
Set<NetServer> connectedServers = new ConcurrentHashSet<>();
Set<Thread> threads = new ConcurrentHashSet<>();
Set<NetServer> connectedServers = Utils.concurrentHashSet();
Set<Thread> threads = Utils.concurrentHashSet();

Future<String> listenLatch = vertx.deployVerticle(() -> new AbstractVerticle() {
NetServer server;
Expand Down Expand Up @@ -2105,7 +2105,7 @@ public void testFanout() throws Exception {

int numConnections = 10;

Set<String> connections = new ConcurrentHashSet<>();
Set<String> connections = Utils.concurrentHashSet();
server.connectHandler(socket -> {
connections.add(socket.writeHandlerID());
if (connections.size() == numConnections) {
Expand Down Expand Up @@ -2552,7 +2552,7 @@ public void testContexts() throws Exception {
}));
awaitLatch(listenLatch);

Set<Context> contexts = new ConcurrentHashSet<>();
Set<Context> contexts = Utils.concurrentHashSet();
AtomicInteger connectCount = new AtomicInteger();
CountDownLatch clientLatch = new CountDownLatch(1);
// Each connect should be in its own context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.http.WebSocketBase;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.impl.Utils;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.metrics.HttpServerMetrics;
import io.vertx.core.spi.observability.HttpRequest;
import io.vertx.core.spi.observability.HttpResponse;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

Expand All @@ -30,7 +31,7 @@
public class FakeHttpServerMetrics extends FakeTCPMetrics implements HttpServerMetrics<HttpServerMetric, WebSocketMetric, SocketMetric> {

private final ConcurrentMap<WebSocketBase, WebSocketMetric> webSockets = new ConcurrentHashMap<>();
private final ConcurrentHashSet<HttpServerMetric> requests = new ConcurrentHashSet<>();
private final Set<HttpServerMetric> requests = Utils.concurrentHashSet();

public WebSocketMetric getWebSocketMetric(ServerWebSocket ws) {
return webSockets.get(ws);
Expand Down

0 comments on commit efc1f68

Please sign in to comment.