From 113886d587b35f306836b1b80d2b986b8694e021 Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Sun, 3 Sep 2023 18:26:54 +0300 Subject: [PATCH] Merged ServiceMethodRegistry into ServiceRegistry (#841) --- .../io/scalecube/services/ServiceCall.java | 27 ++---- .../methods/ServiceMethodRegistry.java | 15 ---- .../registry/api/ServiceRegistry.java | 8 ++ .../transport/api/ServiceTransport.java | 6 +- .../rsocket/RSocketServerTransport.java | 12 +-- .../rsocket/RSocketServiceAcceptor.java | 26 +++--- .../rsocket/RSocketServiceTransport.java | 6 +- .../io/scalecube/services/Microservices.java | 21 +---- .../methods/ServiceMethodRegistryImpl.java | 88 ------------------- .../registry/ServiceRegistryImpl.java | 73 +++++++++++++++ .../rsocket/RSocketServiceTransportTest.java | 47 ++++++---- 11 files changed, 145 insertions(+), 184 deletions(-) delete mode 100644 services-api/src/main/java/io/scalecube/services/methods/ServiceMethodRegistry.java delete mode 100644 services/src/main/java/io/scalecube/services/methods/ServiceMethodRegistryImpl.java diff --git a/services-api/src/main/java/io/scalecube/services/ServiceCall.java b/services-api/src/main/java/io/scalecube/services/ServiceCall.java index ed899eafa..a09a82489 100644 --- a/services-api/src/main/java/io/scalecube/services/ServiceCall.java +++ b/services-api/src/main/java/io/scalecube/services/ServiceCall.java @@ -7,7 +7,6 @@ import io.scalecube.services.exceptions.ServiceUnavailableException; import io.scalecube.services.methods.MethodInfo; import io.scalecube.services.methods.ServiceMethodInvoker; -import io.scalecube.services.methods.ServiceMethodRegistry; import io.scalecube.services.registry.api.ServiceRegistry; import io.scalecube.services.routing.Router; import io.scalecube.services.routing.Routers; @@ -34,7 +33,6 @@ public class ServiceCall { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceCall.class); private ClientTransport transport; - private ServiceMethodRegistry methodRegistry; private ServiceRegistry serviceRegistry; private Router router; private ServiceClientErrorMapper errorMapper = DefaultErrorMapper.INSTANCE; @@ -45,7 +43,6 @@ public ServiceCall() {} private ServiceCall(ServiceCall other) { this.transport = other.transport; - this.methodRegistry = other.methodRegistry; this.serviceRegistry = other.serviceRegistry; this.router = other.router; this.errorMapper = other.errorMapper; @@ -77,18 +74,6 @@ public ServiceCall serviceRegistry(ServiceRegistry serviceRegistry) { return target; } - /** - * Setter for {@code methodRegistry}. - * - * @param methodRegistry method registry. - * @return new {@link ServiceCall} instance. - */ - public ServiceCall methodRegistry(ServiceMethodRegistry methodRegistry) { - ServiceCall target = new ServiceCall(this); - target.methodRegistry = methodRegistry; - return target; - } - /** * Setter for {@code routerType}. * @@ -180,8 +165,8 @@ public Mono requestOne(ServiceMessage request, Type responseType return Mono.defer( () -> { ServiceMethodInvoker methodInvoker; - if (methodRegistry != null - && (methodInvoker = methodRegistry.getInvoker(request.qualifier())) != null) { + if (serviceRegistry != null + && (methodInvoker = serviceRegistry.getInvoker(request.qualifier())) != null) { // local service return methodInvoker.invokeOne(request).map(this::throwIfError); } else { @@ -219,8 +204,8 @@ public Flux requestMany(ServiceMessage request, Type responseTyp return Flux.defer( () -> { ServiceMethodInvoker methodInvoker; - if (methodRegistry != null - && (methodInvoker = methodRegistry.getInvoker(request.qualifier())) != null) { + if (serviceRegistry != null + && (methodInvoker = serviceRegistry.getInvoker(request.qualifier())) != null) { // local service return methodInvoker.invokeMany(request).map(this::throwIfError); } else { @@ -262,8 +247,8 @@ public Flux requestBidirectional( if (first.hasValue()) { ServiceMessage request = first.get(); ServiceMethodInvoker methodInvoker; - if (methodRegistry != null - && (methodInvoker = methodRegistry.getInvoker(request.qualifier())) != null) { + if (serviceRegistry != null + && (methodInvoker = serviceRegistry.getInvoker(request.qualifier())) != null) { // local service return methodInvoker.invokeBidirectional(messages).map(this::throwIfError); } else { diff --git a/services-api/src/main/java/io/scalecube/services/methods/ServiceMethodRegistry.java b/services-api/src/main/java/io/scalecube/services/methods/ServiceMethodRegistry.java deleted file mode 100644 index 9ea2e1c7e..000000000 --- a/services-api/src/main/java/io/scalecube/services/methods/ServiceMethodRegistry.java +++ /dev/null @@ -1,15 +0,0 @@ -package io.scalecube.services.methods; - -import io.scalecube.services.ServiceInfo; -import java.util.List; - -public interface ServiceMethodRegistry { - - void registerService(ServiceInfo serviceInfo); - - ServiceMethodInvoker getInvoker(String qualifier); - - List listInvokers(); - - List listServices(); -} diff --git a/services-api/src/main/java/io/scalecube/services/registry/api/ServiceRegistry.java b/services-api/src/main/java/io/scalecube/services/registry/api/ServiceRegistry.java index 6f4086fc7..49b0be33f 100644 --- a/services-api/src/main/java/io/scalecube/services/registry/api/ServiceRegistry.java +++ b/services-api/src/main/java/io/scalecube/services/registry/api/ServiceRegistry.java @@ -1,8 +1,10 @@ package io.scalecube.services.registry.api; import io.scalecube.services.ServiceEndpoint; +import io.scalecube.services.ServiceInfo; import io.scalecube.services.ServiceReference; import io.scalecube.services.api.ServiceMessage; +import io.scalecube.services.methods.ServiceMethodInvoker; import java.util.List; /** @@ -20,4 +22,10 @@ public interface ServiceRegistry { boolean registerService(ServiceEndpoint serviceEndpoint); ServiceEndpoint unregisterService(String endpointId); + + void registerService(ServiceInfo serviceInfo); + + List listServices(); + + ServiceMethodInvoker getInvoker(String qualifier); } diff --git a/services-api/src/main/java/io/scalecube/services/transport/api/ServiceTransport.java b/services-api/src/main/java/io/scalecube/services/transport/api/ServiceTransport.java index 40aabf28d..8c9364562 100644 --- a/services-api/src/main/java/io/scalecube/services/transport/api/ServiceTransport.java +++ b/services-api/src/main/java/io/scalecube/services/transport/api/ServiceTransport.java @@ -1,6 +1,6 @@ package io.scalecube.services.transport.api; -import io.scalecube.services.methods.ServiceMethodRegistry; +import io.scalecube.services.registry.api.ServiceRegistry; public interface ServiceTransport { @@ -14,10 +14,10 @@ public interface ServiceTransport { /** * Provider for {@link ServerTransport}. * - * @param methodRegistry methodRegistry + * @param serviceRegistry serviceRegistry * @return {@code ServerTransport} instance */ - ServerTransport serverTransport(ServiceMethodRegistry methodRegistry); + ServerTransport serverTransport(ServiceRegistry serviceRegistry); /** * Starts {@link ServiceTransport} instance. diff --git a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServerTransport.java b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServerTransport.java index 6bcd4a72d..e14bf5350 100644 --- a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServerTransport.java +++ b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServerTransport.java @@ -5,7 +5,7 @@ import io.rsocket.transport.netty.server.CloseableChannel; import io.scalecube.net.Address; import io.scalecube.services.auth.Authenticator; -import io.scalecube.services.methods.ServiceMethodRegistry; +import io.scalecube.services.registry.api.ServiceRegistry; import io.scalecube.services.transport.api.DataCodec; import io.scalecube.services.transport.api.HeadersCodec; import io.scalecube.services.transport.api.ServerTransport; @@ -18,7 +18,7 @@ public class RSocketServerTransport implements ServerTransport { private static final Logger LOGGER = LoggerFactory.getLogger(RSocketServerTransport.class); private final Authenticator authenticator; - private final ServiceMethodRegistry methodRegistry; + private final ServiceRegistry serviceRegistry; private final ConnectionSetupCodec connectionSetupCodec; private final HeadersCodec headersCodec; private final Collection dataCodecs; @@ -30,7 +30,7 @@ public class RSocketServerTransport implements ServerTransport { * Constructor for this server transport. * * @param authenticator authenticator - * @param methodRegistry methodRegistry + * @param serviceRegistry serviceRegistry * @param connectionSetupCodec connectionSetupCodec * @param headersCodec headersCodec * @param dataCodecs dataCodecs @@ -38,13 +38,13 @@ public class RSocketServerTransport implements ServerTransport { */ public RSocketServerTransport( Authenticator authenticator, - ServiceMethodRegistry methodRegistry, + ServiceRegistry serviceRegistry, ConnectionSetupCodec connectionSetupCodec, HeadersCodec headersCodec, Collection dataCodecs, RSocketServerTransportFactory serverTransportFactory) { this.authenticator = authenticator; - this.methodRegistry = methodRegistry; + this.serviceRegistry = serviceRegistry; this.connectionSetupCodec = connectionSetupCodec; this.headersCodec = headersCodec; this.dataCodecs = dataCodecs; @@ -64,7 +64,7 @@ public ServerTransport bind() { RSocketServer.create() .acceptor( new RSocketServiceAcceptor( - connectionSetupCodec, headersCodec, dataCodecs, authenticator, methodRegistry)) + connectionSetupCodec, headersCodec, dataCodecs, authenticator, serviceRegistry)) .payloadDecoder(PayloadDecoder.DEFAULT) .bind(serverTransportFactory.serverTransport()) .doOnSuccess(channel -> serverChannel = channel) diff --git a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceAcceptor.java b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceAcceptor.java index 192d52624..f0b7714d9 100644 --- a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceAcceptor.java +++ b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceAcceptor.java @@ -17,7 +17,7 @@ import io.scalecube.services.exceptions.ServiceUnavailableException; import io.scalecube.services.exceptions.UnauthorizedException; import io.scalecube.services.methods.ServiceMethodInvoker; -import io.scalecube.services.methods.ServiceMethodRegistry; +import io.scalecube.services.registry.api.ServiceRegistry; import io.scalecube.services.transport.api.DataCodec; import io.scalecube.services.transport.api.HeadersCodec; import java.util.Collection; @@ -36,7 +36,7 @@ public class RSocketServiceAcceptor implements SocketAcceptor { private final HeadersCodec headersCodec; private final Collection dataCodecs; private final Authenticator authenticator; - private final ServiceMethodRegistry methodRegistry; + private final ServiceRegistry serviceRegistry; /** * Constructor. @@ -45,19 +45,19 @@ public class RSocketServiceAcceptor implements SocketAcceptor { * @param headersCodec headersCodec * @param dataCodecs dataCodecs * @param authenticator authenticator - * @param methodRegistry methodRegistry + * @param serviceRegistry serviceRegistry */ public RSocketServiceAcceptor( ConnectionSetupCodec connectionSetupCodec, HeadersCodec headersCodec, Collection dataCodecs, Authenticator authenticator, - ServiceMethodRegistry methodRegistry) { + ServiceRegistry serviceRegistry) { this.connectionSetupCodec = connectionSetupCodec; this.headersCodec = headersCodec; this.dataCodecs = dataCodecs; this.authenticator = authenticator; - this.methodRegistry = methodRegistry; + this.serviceRegistry = serviceRegistry; } @Override @@ -99,7 +99,7 @@ private Mono authenticate(RSocket rsocket, ConnectionSetup connectionSet private RSocket newRSocket(Object authData) { return new RSocketImpl( - authData, new ServiceMessageCodec(headersCodec, dataCodecs), methodRegistry); + authData, new ServiceMessageCodec(headersCodec, dataCodecs), serviceRegistry); } private UnauthorizedException toUnauthorizedException(Throwable th) { @@ -115,13 +115,13 @@ private static class RSocketImpl implements RSocket { private final Object authData; private final ServiceMessageCodec messageCodec; - private final ServiceMethodRegistry methodRegistry; + private final ServiceRegistry serviceRegistry; private RSocketImpl( - Object authData, ServiceMessageCodec messageCodec, ServiceMethodRegistry methodRegistry) { + Object authData, ServiceMessageCodec messageCodec, ServiceRegistry serviceRegistry) { this.authData = authData; this.messageCodec = messageCodec; - this.methodRegistry = methodRegistry; + this.serviceRegistry = serviceRegistry; } @Override @@ -130,7 +130,8 @@ public Mono requestResponse(Payload payload) { .doOnNext(this::validateRequest) .flatMap( message -> { - ServiceMethodInvoker methodInvoker = methodRegistry.getInvoker(message.qualifier()); + ServiceMethodInvoker methodInvoker = + serviceRegistry.getInvoker(message.qualifier()); validateMethodInvoker(methodInvoker, message); return methodInvoker .invokeOne(message) @@ -147,7 +148,8 @@ public Flux requestStream(Payload payload) { .doOnNext(this::validateRequest) .flatMapMany( message -> { - ServiceMethodInvoker methodInvoker = methodRegistry.getInvoker(message.qualifier()); + ServiceMethodInvoker methodInvoker = + serviceRegistry.getInvoker(message.qualifier()); validateMethodInvoker(methodInvoker, message); return methodInvoker .invokeMany(message) @@ -168,7 +170,7 @@ public Flux requestChannel(Publisher payloads) { ServiceMessage message = first.get(); validateRequest(message); ServiceMethodInvoker methodInvoker = - methodRegistry.getInvoker(message.qualifier()); + serviceRegistry.getInvoker(message.qualifier()); validateMethodInvoker(methodInvoker, message); return methodInvoker .invokeBidirectional(messages) diff --git a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java index c99f950be..41b684942 100644 --- a/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java +++ b/services-transport-parent/services-transport-rsocket/src/main/java/io/scalecube/services/transport/rsocket/RSocketServiceTransport.java @@ -9,7 +9,7 @@ import io.scalecube.services.auth.Authenticator; import io.scalecube.services.auth.CredentialsSupplier; import io.scalecube.services.exceptions.ConnectionClosedException; -import io.scalecube.services.methods.ServiceMethodRegistry; +import io.scalecube.services.registry.api.ServiceRegistry; import io.scalecube.services.transport.api.ClientTransport; import io.scalecube.services.transport.api.DataCodec; import io.scalecube.services.transport.api.HeadersCodec; @@ -195,10 +195,10 @@ public ClientTransport clientTransport() { } @Override - public ServerTransport serverTransport(ServiceMethodRegistry methodRegistry) { + public ServerTransport serverTransport(ServiceRegistry serviceRegistry) { return new RSocketServerTransport( authenticator, - methodRegistry, + serviceRegistry, connectionSetupCodec, headersCodec, dataCodecs, diff --git a/services/src/main/java/io/scalecube/services/Microservices.java b/services/src/main/java/io/scalecube/services/Microservices.java index c437dd52d..ce808145e 100644 --- a/services/src/main/java/io/scalecube/services/Microservices.java +++ b/services/src/main/java/io/scalecube/services/Microservices.java @@ -15,8 +15,6 @@ import io.scalecube.services.exceptions.ServiceProviderErrorMapper; import io.scalecube.services.gateway.Gateway; import io.scalecube.services.gateway.GatewayOptions; -import io.scalecube.services.methods.ServiceMethodRegistry; -import io.scalecube.services.methods.ServiceMethodRegistryImpl; import io.scalecube.services.registry.ServiceRegistryImpl; import io.scalecube.services.registry.api.ServiceRegistry; import io.scalecube.services.routing.RoundRobinServiceRouter; @@ -124,7 +122,6 @@ public final class Microservices implements AutoCloseable { private final Map tags; private final List serviceProviders; private final ServiceRegistry serviceRegistry; - private final ServiceMethodRegistry methodRegistry; private final Authenticator defaultAuthenticator; private final ServiceTransportBootstrap transportBootstrap; private final GatewayBootstrap gatewayBootstrap; @@ -143,7 +140,6 @@ private Microservices(Builder builder) { this.tags = Collections.unmodifiableMap(new HashMap<>(builder.tags)); this.serviceProviders = new ArrayList<>(builder.serviceProviders); this.serviceRegistry = builder.serviceRegistry; - this.methodRegistry = builder.methodRegistry; this.defaultAuthenticator = builder.defaultAuthenticator; this.gatewayBootstrap = builder.gatewayBootstrap; this.discoveryBootstrap = builder.discoveryBootstrap; @@ -241,7 +237,7 @@ private Mono concludeDiscovery( } private void registerService(ServiceInfo serviceInfo) { - methodRegistry.registerService( + serviceRegistry.registerService( ServiceInfo.from(serviceInfo) .errorMapperIfAbsent(defaultErrorMapper) .dataDecoderIfAbsent(defaultDataDecoder) @@ -258,7 +254,6 @@ public ServiceCall call() { return new ServiceCall() .transport(transportBootstrap.clientTransport) .serviceRegistry(serviceRegistry) - .methodRegistry(methodRegistry) .contentType(defaultContentType) .errorMapper(DefaultErrorMapper.INSTANCE) .router(Routers.getRouter(RoundRobinServiceRouter.class)); @@ -288,10 +283,6 @@ public ServiceRegistry serviceRegistry() { return serviceRegistry; } - public ServiceMethodRegistry methodRegistry() { - return methodRegistry; - } - public Address discoveryAddress() { return discoveryBootstrap.serviceDiscovery != null ? discoveryBootstrap.serviceDiscovery.address() @@ -350,7 +341,7 @@ private Mono applyBeforeDestroy() { return Mono.defer( () -> Mono.whenDelayError( - methodRegistry.listServices().stream() + serviceRegistry.listServices().stream() .map(ServiceInfo::serviceInstance) .map(s -> Mono.fromRunnable(() -> Injector.processBeforeDestroy(this, s))) .collect(Collectors.toList()))); @@ -370,7 +361,6 @@ public static final class Builder { private Map tags = new HashMap<>(); private final List serviceProviders = new ArrayList<>(); private ServiceRegistry serviceRegistry = new ServiceRegistryImpl(); - private ServiceMethodRegistry methodRegistry = new ServiceMethodRegistryImpl(); private Authenticator defaultAuthenticator = null; private final ServiceDiscoveryBootstrap discoveryBootstrap = new ServiceDiscoveryBootstrap(); private ServiceTransportBootstrap transportBootstrap = new ServiceTransportBootstrap(); @@ -436,11 +426,6 @@ public Builder serviceRegistry(ServiceRegistry serviceRegistry) { return this; } - public Builder methodRegistry(ServiceMethodRegistry methodRegistry) { - this.methodRegistry = methodRegistry; - return this; - } - public Builder discovery(ServiceDiscoveryFactory discoveryFactory) { this.discoveryBootstrap.operator(options -> options.discoveryFactory(discoveryFactory)); return this; @@ -717,7 +702,7 @@ private ServiceTransportBootstrap start(Microservices microservices) { try { try { serviceTransport = serviceTransport.start(); - serverTransport = serviceTransport.serverTransport(microservices.methodRegistry).bind(); + serverTransport = serviceTransport.serverTransport(microservices.serviceRegistry).bind(); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/services/src/main/java/io/scalecube/services/methods/ServiceMethodRegistryImpl.java b/services/src/main/java/io/scalecube/services/methods/ServiceMethodRegistryImpl.java deleted file mode 100644 index 9d29275c4..000000000 --- a/services/src/main/java/io/scalecube/services/methods/ServiceMethodRegistryImpl.java +++ /dev/null @@ -1,88 +0,0 @@ -package io.scalecube.services.methods; - -import io.scalecube.services.Reflect; -import io.scalecube.services.ServiceInfo; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public final class ServiceMethodRegistryImpl implements ServiceMethodRegistry { - - private static final Logger LOGGER = LoggerFactory.getLogger(ServiceMethodRegistry.class); - - private final List serviceInfos = new CopyOnWriteArrayList<>(); - - private final ConcurrentMap methodInvokers = - new ConcurrentHashMap<>(); - - @Override - public void registerService(ServiceInfo serviceInfo) { - serviceInfos.add(serviceInfo); - - Reflect.serviceInterfaces(serviceInfo.serviceInstance()) - .forEach( - serviceInterface -> - Reflect.serviceMethods(serviceInterface) - .forEach( - (key, method) -> { - - // validate method - Reflect.validateMethodOrThrow(method); - - MethodInfo methodInfo = - new MethodInfo( - Reflect.serviceName(serviceInterface), - Reflect.methodName(method), - Reflect.parameterizedReturnType(method), - Reflect.isReturnTypeServiceMessage(method), - Reflect.communicationMode(method), - method.getParameterCount(), - Reflect.requestType(method), - Reflect.isRequestTypeServiceMessage(method), - Reflect.isSecured(method)); - - checkMethodInvokerDoesntExist(methodInfo); - - ServiceMethodInvoker methodInvoker = - new ServiceMethodInvoker( - method, - serviceInfo.serviceInstance(), - methodInfo, - serviceInfo.errorMapper(), - serviceInfo.dataDecoder(), - serviceInfo.authenticator(), - serviceInfo.principalMapper()); - - methodInvokers.put(methodInfo.qualifier(), methodInvoker); - methodInvokers.put(methodInfo.oldQualifier(), methodInvoker); - })); - } - - private void checkMethodInvokerDoesntExist(MethodInfo methodInfo) { - if (methodInvokers.containsKey(methodInfo.qualifier()) - || methodInvokers.containsKey(methodInfo.oldQualifier())) { - LOGGER.error("MethodInvoker already exists, methodInfo: {}", methodInfo); - throw new IllegalStateException("MethodInvoker already exists"); - } - } - - @Override - public ServiceMethodInvoker getInvoker(String qualifier) { - return methodInvokers.get(Objects.requireNonNull(qualifier, "[getInvoker] qualifier")); - } - - @Override - public List listInvokers() { - return new ArrayList<>(methodInvokers.values()); - } - - @Override - public List listServices() { - return new ArrayList<>(serviceInfos); - } -} diff --git a/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java b/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java index 9c58a402d..7619703f9 100644 --- a/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java +++ b/services/src/main/java/io/scalecube/services/registry/ServiceRegistryImpl.java @@ -1,14 +1,21 @@ package io.scalecube.services.registry; +import io.scalecube.services.Reflect; import io.scalecube.services.ServiceEndpoint; +import io.scalecube.services.ServiceInfo; import io.scalecube.services.ServiceReference; import io.scalecube.services.api.ServiceMessage; +import io.scalecube.services.methods.MethodInfo; +import io.scalecube.services.methods.ServiceMethodInvoker; import io.scalecube.services.registry.api.ServiceRegistry; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; import org.jctools.maps.NonBlockingHashMap; @@ -23,6 +30,11 @@ public class ServiceRegistryImpl implements ServiceRegistry { private final Map serviceEndpoints = new NonBlockingHashMap<>(); private final Map> serviceReferencesByQualifier = new NonBlockingHashMap<>(); + private final List serviceInfos = new CopyOnWriteArrayList<>(); + private final ConcurrentMap methodInvokers = + new ConcurrentHashMap<>(); + + public ServiceRegistryImpl() {} @Override public List listServiceEndpoints() { @@ -86,6 +98,67 @@ public ServiceEndpoint unregisterService(String endpointId) { return serviceEndpoint; } + @Override + public void registerService(ServiceInfo serviceInfo) { + serviceInfos.add(serviceInfo); + + Reflect.serviceInterfaces(serviceInfo.serviceInstance()) + .forEach( + serviceInterface -> + Reflect.serviceMethods(serviceInterface) + .forEach( + (key, method) -> { + + // validate method + Reflect.validateMethodOrThrow(method); + + MethodInfo methodInfo = + new MethodInfo( + Reflect.serviceName(serviceInterface), + Reflect.methodName(method), + Reflect.parameterizedReturnType(method), + Reflect.isReturnTypeServiceMessage(method), + Reflect.communicationMode(method), + method.getParameterCount(), + Reflect.requestType(method), + Reflect.isRequestTypeServiceMessage(method), + Reflect.isSecured(method)); + + checkMethodInvokerDoesntExist(methodInfo); + + ServiceMethodInvoker methodInvoker = + new ServiceMethodInvoker( + method, + serviceInfo.serviceInstance(), + methodInfo, + serviceInfo.errorMapper(), + serviceInfo.dataDecoder(), + serviceInfo.authenticator(), + serviceInfo.principalMapper()); + + methodInvokers.put(methodInfo.qualifier(), methodInvoker); + methodInvokers.put(methodInfo.oldQualifier(), methodInvoker); + })); + } + + private void checkMethodInvokerDoesntExist(MethodInfo methodInfo) { + if (methodInvokers.containsKey(methodInfo.qualifier()) + || methodInvokers.containsKey(methodInfo.oldQualifier())) { + LOGGER.error("MethodInvoker already exists, methodInfo: {}", methodInfo); + throw new IllegalStateException("MethodInvoker already exists"); + } + } + + @Override + public ServiceMethodInvoker getInvoker(String qualifier) { + return methodInvokers.get(Objects.requireNonNull(qualifier, "[getInvoker] qualifier")); + } + + @Override + public List listServices() { + return serviceInfos; + } + private void populateServiceReferences(String qualifier, ServiceReference serviceReference) { serviceReferencesByQualifier .computeIfAbsent(qualifier, key -> new CopyOnWriteArrayList<>()) diff --git a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java index eb4e89599..6838a4595 100644 --- a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java +++ b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java @@ -2,6 +2,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import io.scalecube.net.Address; import io.scalecube.services.BaseTest; @@ -33,6 +34,7 @@ public class RSocketServiceTransportTest extends BaseTest { ServiceMessage.builder().qualifier(QuoteService.NAME, "justManyNever").build(); private static final ServiceMessage ONLY_ONE_AND_THEN_NEVER = ServiceMessage.builder().qualifier(QuoteService.NAME, "onlyOneAndThenNever").build(); + public static final Duration TIMEOUT = Duration.ofSeconds(6); private Microservices gateway; private Microservices serviceNode; @@ -80,7 +82,7 @@ public void cleanUp() { public void test_remote_node_died_mono_never() throws Exception { int batchSize = 1; - final CountDownLatch latch1 = new CountDownLatch(batchSize); + final CountDownLatch latch = new CountDownLatch(batchSize); AtomicReference sub1 = new AtomicReference<>(null); AtomicReference exceptionHolder = new AtomicReference<>(null); @@ -90,16 +92,19 @@ public void test_remote_node_died_mono_never() throws Exception { gateway .listenDiscovery() .filter(ServiceDiscoveryEvent::isEndpointRemoved) - .subscribe(onNext -> latch1.countDown(), System.err::println); + .subscribe(onNext -> latch.countDown(), System.err::println); // service node goes down TimeUnit.SECONDS.sleep(3); - serviceNode.shutdown().block(Duration.ofSeconds(6)); + serviceNode.shutdown().block(TIMEOUT); - latch1.await(20, TimeUnit.SECONDS); - TimeUnit.MILLISECONDS.sleep(100); + if (!latch.await(20, TimeUnit.SECONDS)) { + fail("latch.await"); + } + + TimeUnit.MILLISECONDS.sleep(1000); - assertEquals(0, latch1.getCount()); + assertEquals(0, latch.getCount()); assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass()); assertTrue(sub1.get().isDisposed()); } @@ -108,7 +113,7 @@ public void test_remote_node_died_mono_never() throws Exception { public void test_remote_node_died_many_never() throws Exception { int batchSize = 1; - final CountDownLatch latch1 = new CountDownLatch(batchSize); + final CountDownLatch latch = new CountDownLatch(batchSize); AtomicReference sub1 = new AtomicReference<>(null); AtomicReference exceptionHolder = new AtomicReference<>(null); @@ -118,16 +123,19 @@ public void test_remote_node_died_many_never() throws Exception { gateway .listenDiscovery() .filter(ServiceDiscoveryEvent::isEndpointRemoved) - .subscribe(onNext -> latch1.countDown(), System.err::println); + .subscribe(onNext -> latch.countDown(), System.err::println); // service node goes down TimeUnit.SECONDS.sleep(3); - serviceNode.shutdown().block(Duration.ofSeconds(6)); + serviceNode.shutdown().block(TIMEOUT); + + if (!latch.await(20, TimeUnit.SECONDS)) { + fail("latch.await"); + } - latch1.await(20, TimeUnit.SECONDS); - TimeUnit.MILLISECONDS.sleep(100); + TimeUnit.MILLISECONDS.sleep(1000); - assertEquals(0, latch1.getCount()); + assertEquals(0, latch.getCount()); assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass()); assertTrue(sub1.get().isDisposed()); } @@ -136,7 +144,7 @@ public void test_remote_node_died_many_never() throws Exception { public void test_remote_node_died_many_then_never() throws Exception { int batchSize = 1; - final CountDownLatch latch1 = new CountDownLatch(batchSize); + final CountDownLatch latch = new CountDownLatch(batchSize); AtomicReference sub1 = new AtomicReference<>(null); AtomicReference exceptionHolder = new AtomicReference<>(null); @@ -150,16 +158,19 @@ public void test_remote_node_died_many_then_never() throws Exception { gateway .listenDiscovery() .filter(ServiceDiscoveryEvent::isEndpointRemoved) - .subscribe(onNext -> latch1.countDown(), System.err::println); + .subscribe(onNext -> latch.countDown(), System.err::println); // service node goes down TimeUnit.SECONDS.sleep(3); - serviceNode.shutdown().block(Duration.ofSeconds(6)); + serviceNode.shutdown().block(TIMEOUT); + + if (!latch.await(20, TimeUnit.SECONDS)) { + fail("latch.await"); + } - latch1.await(20, TimeUnit.SECONDS); - TimeUnit.MILLISECONDS.sleep(100); + TimeUnit.MILLISECONDS.sleep(1000); - assertEquals(0, latch1.getCount()); + assertEquals(0, latch.getCount()); assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass()); assertTrue(sub1.get().isDisposed()); }