From c0ee676dea85efdcffd02293a6bedbfc8ef61320 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Thu, 30 Aug 2018 01:09:10 +0300 Subject: [PATCH 1/2] darft of reactive Hazelcast session repository --- .../spring-session-hazelcast.gradle | 3 + .../HazelcastReactiveSessionRepository.java | 502 ++++++++++++++++++ ...zelcastReactiveSessionRepositoryTests.java | 381 +++++++++++++ 3 files changed, 886 insertions(+) create mode 100644 spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/HazelcastReactiveSessionRepository.java create mode 100644 spring-session-hazelcast/src/test/java/org/springframework/session/hazelcast/HazelcastReactiveSessionRepositoryTests.java diff --git a/spring-session-hazelcast/spring-session-hazelcast.gradle b/spring-session-hazelcast/spring-session-hazelcast.gradle index ae4d5c5f4..f980ce5f9 100644 --- a/spring-session-hazelcast/spring-session-hazelcast.gradle +++ b/spring-session-hazelcast/spring-session-hazelcast.gradle @@ -1,6 +1,9 @@ apply plugin: 'io.spring.convention.spring-module' dependencies { + + optional "io.projectreactor:reactor-core" + compile project(':spring-session-core') compile "com.hazelcast:hazelcast" compile "org.springframework:spring-context" diff --git a/spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/HazelcastReactiveSessionRepository.java b/spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/HazelcastReactiveSessionRepository.java new file mode 100644 index 000000000..81dda7585 --- /dev/null +++ b/spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/HazelcastReactiveSessionRepository.java @@ -0,0 +1,502 @@ +/* + * Copyright 2014-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.session.hazelcast; + +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import com.hazelcast.core.EntryEvent; +import com.hazelcast.core.ExecutionCallback; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.ICompletableFuture; +import com.hazelcast.core.IMap; +import com.hazelcast.map.listener.EntryAddedListener; +import com.hazelcast.map.listener.EntryEvictedListener; +import com.hazelcast.map.listener.EntryRemovedListener; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import reactor.core.CoreSubscriber; +import reactor.core.Exceptions; +import reactor.core.Fuseable; +import reactor.core.Scannable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Operators; + +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.session.MapSession; +import org.springframework.session.ReactiveSessionRepository; +import org.springframework.session.Session; +import org.springframework.session.events.AbstractSessionEvent; +import org.springframework.session.events.SessionCreatedEvent; +import org.springframework.session.events.SessionDeletedEvent; +import org.springframework.session.events.SessionExpiredEvent; +import org.springframework.util.Assert; + +/** + * A {@link org.springframework.session.SessionRepository} implementation that stores + * sessions in Hazelcast's distributed {@link IMap}. + * + *

+ * An example of how to create a new instance can be seen below: + * + *

+ * Config config = new Config();
+ *
+ * // ... configure Hazelcast ...
+ *
+ * HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(config);
+ *
+ * HazelcastSessionRepository sessionRepository =
+ *         new HazelcastSessionRepository(hazelcastInstance);
+ * 
+ * + * In order to support finding sessions by principal name using + * {@link #findByIndexNameAndIndexValue(String, String)} method, custom configuration of + * {@code IMap} supplied to this implementation is required. + * + * The following snippet demonstrates how to define required configuration using + * programmatic Hazelcast Configuration: + * + *
+ * MapAttributeConfig attributeConfig = new MapAttributeConfig()
+ *         .setName(HazelcastSessionRepository.PRINCIPAL_NAME_ATTRIBUTE)
+ *         .setExtractor(PrincipalNameExtractor.class.getName());
+ *
+ * Config config = new Config();
+ *
+ * config.getMapConfig(HazelcastSessionRepository.DEFAULT_SESSION_MAP_NAME)
+ *         .addMapAttributeConfig(attributeConfig)
+ *         .addMapIndexConfig(new MapIndexConfig(
+ *                 HazelcastSessionRepository.PRINCIPAL_NAME_ATTRIBUTE, false));
+ *
+ * Hazelcast.newHazelcastInstance(config);
+ * 
+ * + * This implementation listens for events on the Hazelcast-backed SessionRepository and + * translates those events into the corresponding Spring Session events. Publish the + * Spring Session events with the given {@link ApplicationEventPublisher}. + * + * + * + * @author Vedran Pavic + * @author Tommy Ludwig + * @author Mark Anderson + * @author Aleksandar Stojsavljevic + * @since 1.3.0 + */ +public class HazelcastReactiveSessionRepository implements + ReactiveSessionRepository, + EntryAddedListener, EntryEvictedListener, + EntryRemovedListener { + + /** + * The default name of map used by Spring Session to store sessions. + */ + public static final String DEFAULT_SESSION_MAP_NAME = "spring:session:sessions"; + + private static final Log logger = LogFactory.getLog(HazelcastReactiveSessionRepository.class); + + private final HazelcastInstance hazelcastInstance; + + private ApplicationEventPublisher eventPublisher = new ApplicationEventPublisher() { + + @Override + public void publishEvent(ApplicationEvent event) { + } + + @Override + public void publishEvent(Object event) { + } + + }; + + /** + * If non-null, this value is used to override + * {@link MapSession#setMaxInactiveInterval(Duration)}. + */ + private Integer defaultMaxInactiveInterval; + + private String sessionMapName = DEFAULT_SESSION_MAP_NAME; + + private HazelcastFlushMode hazelcastFlushMode = HazelcastFlushMode.ON_SAVE; + + private IMap sessions; + + private String sessionListenerId; + + public HazelcastReactiveSessionRepository(HazelcastInstance hazelcastInstance) { + Assert.notNull(hazelcastInstance, "HazelcastInstance must not be null"); + this.hazelcastInstance = hazelcastInstance; + } + + @PostConstruct + public void init() { + this.sessions = this.hazelcastInstance.getMap(this.sessionMapName); + this.sessionListenerId = this.sessions.addEntryListener(this, true); + } + + @PreDestroy + public void close() { + this.sessions.removeEntryListener(this.sessionListenerId); + } + + /** + * Sets the {@link ApplicationEventPublisher} that is used to publish + * {@link AbstractSessionEvent session events}. The default is to not publish session + * events. + * + * @param applicationEventPublisher the {@link ApplicationEventPublisher} that is used + * to publish session events. Cannot be null. + */ + public void setApplicationEventPublisher( + ApplicationEventPublisher applicationEventPublisher) { + Assert.notNull(applicationEventPublisher, + "ApplicationEventPublisher cannot be null"); + this.eventPublisher = applicationEventPublisher; + } + + /** + * Set the maximum inactive interval in seconds between requests before newly created + * sessions will be invalidated. A negative time indicates that the session will never + * timeout. The default is 1800 (30 minutes). + * @param defaultMaxInactiveInterval the maximum inactive interval in seconds + */ + public void setDefaultMaxInactiveInterval(Integer defaultMaxInactiveInterval) { + this.defaultMaxInactiveInterval = defaultMaxInactiveInterval; + } + + /** + * Set the name of map used to store sessions. + * @param sessionMapName the session map name + */ + public void setSessionMapName(String sessionMapName) { + Assert.hasText(sessionMapName, "Map name must not be empty"); + this.sessionMapName = sessionMapName; + } + + /** + * Sets the Hazelcast flush mode. Default flush mode is + * {@link HazelcastFlushMode#ON_SAVE}. + * @param hazelcastFlushMode the new Hazelcast flush mode + */ + public void setHazelcastFlushMode(HazelcastFlushMode hazelcastFlushMode) { + Assert.notNull(hazelcastFlushMode, "HazelcastFlushMode cannot be null"); + this.hazelcastFlushMode = hazelcastFlushMode; + } + + @Override + public Mono createSession() { + HazelcastSession result = new HazelcastSession(); + if (this.defaultMaxInactiveInterval != null) { + result.setMaxInactiveInterval( + Duration.ofSeconds(this.defaultMaxInactiveInterval)); + } + return Mono.just(result); + } + + @Override + public Mono save(HazelcastSession session) { + Mono result = Mono.empty(); + if (session.isNew) { + result = new MonoICompletableFuture<>(this.sessions.setAsync(session.getId(), session.getDelegate(), + session.getMaxInactiveInterval().getSeconds(), TimeUnit.SECONDS)); + } + else if (session.sessionIdChanged) { + String originalId = session.originalId; + session.originalId = session.getId(); + result = Flux.merge( + new MonoICompletableFuture<>(this.sessions.removeAsync(originalId)), + new MonoICompletableFuture<>(this.sessions.setAsync(session.getId(), session.getDelegate(), + session.getMaxInactiveInterval().getSeconds(), TimeUnit.SECONDS)) + ).then(); + } + else if (session.hasChanges()) { + SessionUpdateEntryProcessor entryProcessor = new SessionUpdateEntryProcessor(); + if (session.lastAccessedTimeChanged) { + entryProcessor.setLastAccessedTime(session.getLastAccessedTime()); + } + if (session.maxInactiveIntervalChanged) { + entryProcessor.setMaxInactiveInterval(session.getMaxInactiveInterval()); + } + if (!session.delta.isEmpty()) { + entryProcessor.setDelta(session.delta); + } + result = new MonoICompletableFuture<>(this.sessions.submitToKey(session.getId(), entryProcessor)); + } + return result.then(Mono.fromRunnable(session::clearChangeFlags)); + } + + @Override + public Mono findById(String id) { + return new MonoICompletableFuture<>(this.sessions.getAsync(id)) + .flatMap(saved -> { + if (saved.isExpired()) { + return deleteById(saved.getId()).then(Mono.empty()); + } + + return Mono.just(new HazelcastSession(saved)); + }); + } + + @Override + public Mono deleteById(String id) { + return new MonoICompletableFuture<>(this.sessions.removeAsync(id)) + .then(); + } + + @Override + public void entryAdded(EntryEvent event) { + MapSession session = event.getValue(); + if (session.getId().equals(session.getOriginalId())) { + if (logger.isDebugEnabled()) { + logger.debug("Session created with id: " + session.getId()); + } + this.eventPublisher.publishEvent(new SessionCreatedEvent(this, session)); + } + } + + @Override + public void entryEvicted(EntryEvent event) { + if (logger.isDebugEnabled()) { + logger.debug("Session expired with id: " + event.getOldValue().getId()); + } + this.eventPublisher + .publishEvent(new SessionExpiredEvent(this, event.getOldValue())); + } + + @Override + public void entryRemoved(EntryEvent event) { + MapSession session = event.getOldValue(); + if (session != null) { + if (logger.isDebugEnabled()) { + logger.debug("Session deleted with id: " + session.getId()); + } + this.eventPublisher.publishEvent(new SessionDeletedEvent(this, session)); + } + } + + /** + * A custom implementation of {@link Session} that uses a {@link MapSession} as the + * basis for its mapping. It keeps track if changes have been made since last save. + * + * @author Aleksandar Stojsavljevic + */ + final class HazelcastSession implements Session { + + private final MapSession delegate; + + private boolean isNew; + + private boolean sessionIdChanged; + + private boolean lastAccessedTimeChanged; + + private boolean maxInactiveIntervalChanged; + + private String originalId; + + private Map delta = new HashMap<>(); + + /** + * Creates a new instance ensuring to mark all of the new attributes to be + * persisted in the next save operation. + */ + HazelcastSession() { + this(new MapSession()); + this.isNew = true; + flushImmediateIfNecessary().subscribe(); + } + + /** + * Creates a new instance from the provided {@link MapSession}. + * @param cached the {@link MapSession} that represents the persisted session that + * was retrieved. Cannot be {@code null}. + */ + HazelcastSession(MapSession cached) { + Assert.notNull(cached, "MapSession cannot be null"); + this.delegate = cached; + this.originalId = cached.getId(); + } + + @Override + public void setLastAccessedTime(Instant lastAccessedTime) { + this.delegate.setLastAccessedTime(lastAccessedTime); + this.lastAccessedTimeChanged = true; + flushImmediateIfNecessary().subscribe(); + } + + @Override + public boolean isExpired() { + return this.delegate.isExpired(); + } + + @Override + public Instant getCreationTime() { + return this.delegate.getCreationTime(); + } + + @Override + public String getId() { + return this.delegate.getId(); + } + + @Override + public String changeSessionId() { + String newSessionId = this.delegate.changeSessionId(); + this.sessionIdChanged = true; + return newSessionId; + } + + @Override + public Instant getLastAccessedTime() { + return this.delegate.getLastAccessedTime(); + } + + @Override + public void setMaxInactiveInterval(Duration interval) { + this.delegate.setMaxInactiveInterval(interval); + this.maxInactiveIntervalChanged = true; + flushImmediateIfNecessary().subscribe(); + } + + @Override + public Duration getMaxInactiveInterval() { + return this.delegate.getMaxInactiveInterval(); + } + + @Override + public T getAttribute(String attributeName) { + return this.delegate.getAttribute(attributeName); + } + + @Override + public Set getAttributeNames() { + return this.delegate.getAttributeNames(); + } + + @Override + public void setAttribute(String attributeName, Object attributeValue) { + this.delegate.setAttribute(attributeName, attributeValue); + this.delta.put(attributeName, attributeValue); + flushImmediateIfNecessary().subscribe(); + } + + @Override + public void removeAttribute(String attributeName) { + this.delegate.removeAttribute(attributeName); + this.delta.put(attributeName, null); + flushImmediateIfNecessary().subscribe(); + } + + MapSession getDelegate() { + return this.delegate; + } + + boolean hasChanges() { + return (this.lastAccessedTimeChanged || this.maxInactiveIntervalChanged + || !this.delta.isEmpty()); + } + + void clearChangeFlags() { + this.isNew = false; + this.lastAccessedTimeChanged = false; + this.sessionIdChanged = false; + this.maxInactiveIntervalChanged = false; + this.delta.clear(); + } + + private Mono flushImmediateIfNecessary() { + if (HazelcastReactiveSessionRepository.this.hazelcastFlushMode == HazelcastFlushMode.IMMEDIATE) { + return HazelcastReactiveSessionRepository.this.save(this); + } + + return Mono.empty(); + } + + } + + static final class MonoICompletableFuture extends Mono + implements Fuseable, Scannable { + + final ICompletableFuture future; + + MonoICompletableFuture(ICompletableFuture future) { + this.future = Objects.requireNonNull(future, "future"); + } + + @Override + @SuppressWarnings("unchecked") + public void subscribe(CoreSubscriber actual) { + Operators.MonoSubscriber + sds = new Operators.MonoSubscriber<>(actual); + + actual.onSubscribe(sds); + + if (sds.isCancelled()) { + return; + } + + future.andThen(new ExecutionCallback() { + public void onResponse(Object v) { + try { + if (v != null) { + sds.complete((T) v); + } + else { + actual.onComplete(); + } + } + catch (Throwable e1) { + Operators.onErrorDropped(e1, actual.currentContext()); + throw Exceptions.bubble(e1); + } + } + + public void onFailure(Throwable e) { + try { + if (e != null) { + actual.onError(e); + } + } + catch (Throwable e1) { + Operators.onErrorDropped(e1, actual.currentContext()); + throw Exceptions.bubble(e1); + } + } + }); + } + + @Override + public Object scanUnsafe(Scannable.Attr key) { + return null; //no particular key to be represented, still useful in hooks + } + } + +} diff --git a/spring-session-hazelcast/src/test/java/org/springframework/session/hazelcast/HazelcastReactiveSessionRepositoryTests.java b/spring-session-hazelcast/src/test/java/org/springframework/session/hazelcast/HazelcastReactiveSessionRepositoryTests.java new file mode 100644 index 000000000..dcff5cd3a --- /dev/null +++ b/spring-session-hazelcast/src/test/java/org/springframework/session/hazelcast/HazelcastReactiveSessionRepositoryTests.java @@ -0,0 +1,381 @@ +/* + * Copyright 2014-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.session.hazelcast; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.ICompletableFuture; +import com.hazelcast.core.IMap; +import com.hazelcast.internal.serialization.impl.SerializationServiceV1; +import com.hazelcast.map.EntryProcessor; +import com.hazelcast.map.listener.MapListener; +import com.hazelcast.util.executor.CompletedFuture; +import org.junit.Before; +import org.junit.Test; + +import org.springframework.session.MapSession; +import org.springframework.session.hazelcast.HazelcastReactiveSessionRepository.HazelcastSession; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; + +/** + * Tests for {@link HazelcastSessionRepository}. + * + * @author Vedran Pavic + * @author Aleksandar Stojsavljevic + */ +public class HazelcastReactiveSessionRepositoryTests { + + private static final String SPRING_SECURITY_CONTEXT = "SPRING_SECURITY_CONTEXT"; + + private HazelcastInstance hazelcastInstance = mock(HazelcastInstance.class); + + @SuppressWarnings("unchecked") + private IMap sessions = mock(IMap.class); + + private HazelcastReactiveSessionRepository repository; + + @Before + public void setUp() { + ICompletableFuture mockFuture = mock(ICompletableFuture.class); + given(sessions.setAsync(anyString(), any())).willReturn(mockFuture); + given(sessions.setAsync(anyString(), any(), anyLong(), any())).willReturn(mockFuture); + given(sessions.getAsync(anyString())).willReturn(mockFuture); + given(sessions.submitToKey(anyString(), any())).willReturn(mockFuture); + given(this.hazelcastInstance.getMap(anyString())) + .willReturn(this.sessions); + this.repository = new HazelcastReactiveSessionRepository(this.hazelcastInstance); + this.repository.init(); + } + + @Test + public void constructorNullHazelcastInstance() { + assertThatThrownBy(() -> new HazelcastSessionRepository(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("HazelcastInstance must not be null"); + } + + @Test + public void createSessionDefaultMaxInactiveInterval() { + verify(this.sessions, times(1)).addEntryListener(any(MapListener.class), + anyBoolean()); + + HazelcastSession session = this.repository.createSession().block(); + + assertThat(session.getMaxInactiveInterval()) + .isEqualTo(new MapSession().getMaxInactiveInterval()); + verifyZeroInteractions(this.sessions); + } + + @Test + public void createSessionCustomMaxInactiveInterval() { + verify(this.sessions, times(1)).addEntryListener(any(MapListener.class), + anyBoolean()); + + int interval = 1; + this.repository.setDefaultMaxInactiveInterval(interval); + + HazelcastSession session = this.repository.createSession().block(); + + assertThat(session.getMaxInactiveInterval()) + .isEqualTo(Duration.ofSeconds(interval)); + verifyZeroInteractions(this.sessions); + } + + @Test + public void saveNewFlushModeOnSave() { + verify(this.sessions, times(1)).addEntryListener(any(MapListener.class), + anyBoolean()); + + HazelcastSession session = this.repository.createSession().block(); + verifyZeroInteractions(this.sessions); + + this.repository.save(session); + verify(this.sessions, times(1)).setAsync(eq(session.getId()), + eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS)); + verifyZeroInteractions(this.sessions); + } + + @Test + public void saveNewFlushModeImmediate() { + verify(this.sessions, times(1)).addEntryListener(any(MapListener.class), + anyBoolean()); + + this.repository.setHazelcastFlushMode(HazelcastFlushMode.IMMEDIATE); + + HazelcastSession session = this.repository.createSession().block(); + verify(this.sessions, times(1)).setAsync(eq(session.getId()), + eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS)); + verifyZeroInteractions(this.sessions); + } + + @Test + public void saveUpdatedAttributeFlushModeOnSave() { + verify(this.sessions, times(1)).addEntryListener(any(MapListener.class), + anyBoolean()); + + HazelcastSession session = this.repository.createSession().block(); + session.setAttribute("testName", "testValue"); + verifyZeroInteractions(this.sessions); + + this.repository.save(session); + verify(this.sessions, times(1)).setAsync(eq(session.getId()), + eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS)); + verifyZeroInteractions(this.sessions); + } + + @Test + public void saveUpdatedAttributeFlushModeImmediate() { + verify(this.sessions, times(1)).addEntryListener(any(MapListener.class), + anyBoolean()); + + this.repository.setHazelcastFlushMode(HazelcastFlushMode.IMMEDIATE); + + HazelcastSession session = this.repository.createSession().block(); + session.setAttribute("testName", "testValue"); + verify(this.sessions, times(1)).setAsync(eq(session.getId()), + eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS)); + verify(this.sessions, times(1)).submitToKey(eq(session.getId()), + any(EntryProcessor.class)); + + this.repository.save(session); + verifyZeroInteractions(this.sessions); + } + + @Test + public void removeAttributeFlushModeOnSave() { + verify(this.sessions, times(1)).addEntryListener(any(MapListener.class), + anyBoolean()); + + HazelcastSession session = this.repository.createSession().block(); + session.removeAttribute("testName"); + verifyZeroInteractions(this.sessions); + + this.repository.save(session); + verify(this.sessions, times(1)).setAsync(eq(session.getId()), + eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS)); + verifyZeroInteractions(this.sessions); + } + + @Test + public void removeAttributeFlushModeImmediate() { + verify(this.sessions, times(1)).addEntryListener(any(MapListener.class), + anyBoolean()); + + this.repository.setHazelcastFlushMode(HazelcastFlushMode.IMMEDIATE); + + HazelcastSession session = this.repository.createSession().block(); + session.removeAttribute("testName"); + verify(this.sessions, times(1)).setAsync(eq(session.getId()), + eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS)); + verify(this.sessions, times(1)).submitToKey(eq(session.getId()), + any(EntryProcessor.class)); + + this.repository.save(session); + verifyZeroInteractions(this.sessions); + } + + @Test + public void saveUpdatedLastAccessedTimeFlushModeOnSave() { + verify(this.sessions, times(1)).addEntryListener(any(MapListener.class), + anyBoolean()); + + HazelcastSession session = this.repository.createSession().block(); + session.setLastAccessedTime(Instant.now()); + verifyZeroInteractions(this.sessions); + + this.repository.save(session); + verify(this.sessions, times(1)).setAsync(eq(session.getId()), + eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS)); + verifyZeroInteractions(this.sessions); + } + + @Test + public void saveUpdatedLastAccessedTimeFlushModeImmediate() { + verify(this.sessions, times(1)).addEntryListener(any(MapListener.class), + anyBoolean()); + + this.repository.setHazelcastFlushMode(HazelcastFlushMode.IMMEDIATE); + + HazelcastSession session = this.repository.createSession().block(); + session.setLastAccessedTime(Instant.now()); + verify(this.sessions, times(1)).setAsync(eq(session.getId()), + eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS)); + verify(this.sessions, times(1)).submitToKey(eq(session.getId()), + any(EntryProcessor.class)); + + this.repository.save(session); + verifyZeroInteractions(this.sessions); + } + + @Test + public void saveUpdatedMaxInactiveIntervalInSecondsFlushModeOnSave() { + verify(this.sessions, times(1)).addEntryListener(any(MapListener.class), + anyBoolean()); + + HazelcastSession session = this.repository.createSession().block(); + session.setMaxInactiveInterval(Duration.ofSeconds(1)); + verifyZeroInteractions(this.sessions); + + this.repository.save(session); + verify(this.sessions, times(1)).setAsync(eq(session.getId()), + eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS)); + verifyZeroInteractions(this.sessions); + } + + @Test + public void saveUpdatedMaxInactiveIntervalInSecondsFlushModeImmediate() { + verify(this.sessions, times(1)).addEntryListener(any(MapListener.class), + anyBoolean()); + + this.repository.setHazelcastFlushMode(HazelcastFlushMode.IMMEDIATE); + + HazelcastSession session = this.repository.createSession().block(); + session.setMaxInactiveInterval(Duration.ofSeconds(1)); + verify(this.sessions, times(2)).setAsync(eq(session.getId()), + eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS)); + verify(this.sessions, times(1)).submitToKey(eq(session.getId()), + any(EntryProcessor.class)); + + this.repository.save(session); + verifyZeroInteractions(this.sessions); + } + + @Test + public void saveUnchangedFlushModeOnSave() { + verify(this.sessions, times(1)).addEntryListener(any(MapListener.class), + anyBoolean()); + + HazelcastSession session = this.repository.createSession().block(); + this.repository.save(session); + verify(this.sessions, times(1)).setAsync(eq(session.getId()), + eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS)); + + this.repository.save(session); + verifyZeroInteractions(this.sessions); + } + + @Test + public void saveUnchangedFlushModeImmediate() { + verify(this.sessions, times(1)).addEntryListener(any(MapListener.class), + anyBoolean()); + + this.repository.setHazelcastFlushMode(HazelcastFlushMode.IMMEDIATE); + + HazelcastSession session = this.repository.createSession().block(); + verify(this.sessions, times(1)).setAsync(eq(session.getId()), + eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS)); + + this.repository.save(session); + verifyZeroInteractions(this.sessions); + } + + @Test + public void getSessionNotFound() { + verify(this.sessions, times(1)).addEntryListener(any(MapListener.class), + anyBoolean()); + + String sessionId = "testSessionId"; + + HazelcastSession session = this.repository.findById(sessionId).block(); + + assertThat(session).isNull(); + verify(this.sessions, times(1)).getAsync(eq(sessionId)); + verifyZeroInteractions(this.sessions); + } + + @Test + public void getSessionExpired() { + verify(this.sessions, times(1)).addEntryListener(any(MapListener.class), + anyBoolean()); + + MapSession expired = new MapSession(); + expired.setLastAccessedTime(Instant.now() + .minusSeconds(MapSession.DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS + 1)); + given(this.sessions.getAsync(eq(expired.getId()))).willReturn( + new CompletedFuture<>(SerializationServiceV1.builder().build(), expired, Executors.newSingleThreadExecutor())); + + HazelcastSession session = this.repository.findById(expired.getId()).block(); + + assertThat(session).isNull(); + verify(this.sessions, times(1)).getAsync(eq(expired.getId())); + verify(this.sessions, times(1)).removeAsync(eq(expired.getId())); + verifyZeroInteractions(this.sessions); + } + + @Test + public void getSessionFound() { + verify(this.sessions, times(1)).addEntryListener(any(MapListener.class), + anyBoolean()); + + MapSession saved = new MapSession(); + saved.setAttribute("savedName", "savedValue"); + given(this.sessions.getAsync(eq(saved.getId()))).willReturn( + new CompletedFuture<>(SerializationServiceV1.builder().build(), saved, Executors.newSingleThreadExecutor())); + + HazelcastSession session = this.repository.findById(saved.getId()).block(); + + assertThat(session.getId()).isEqualTo(saved.getId()); + assertThat(session.getAttribute("savedName")).isEqualTo("savedValue"); + verify(this.sessions, times(1)).getAsync(eq(saved.getId())); + verifyZeroInteractions(this.sessions); + } + + @Test + public void delete() { + verify(this.sessions, times(1)).addEntryListener(any(MapListener.class), + anyBoolean()); + + String sessionId = "testSessionId"; + + this.repository.deleteById(sessionId).block(); + + verify(this.sessions, times(1)).removeAsync(eq(sessionId)); + verifyZeroInteractions(this.sessions); + } + + + @Test // gh-1120 + public void getAttributeNamesAndRemove() { + HazelcastSession session = this.repository.createSession().block(); + session.setAttribute("attribute1", "value1"); + session.setAttribute("attribute2", "value2"); + + for (String attributeName : session.getAttributeNames()) { + session.removeAttribute(attributeName); + } + + assertThat(session.getAttributeNames()).isEmpty(); + } + +} From a3c3983c5628d8b7961e0eccccb7449b9737cb68 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Thu, 30 Aug 2018 09:07:48 +0300 Subject: [PATCH 2/2] eliminate applications' events publishing; provide spring configuration --- .../HazelcastReactiveSessionRepository.java | 95 +-------------- .../web/server/EnableHazelcastWebSession.java | 102 ++++++++++++++++ .../HazelcastWebSessionConfiguration.java | 113 ++++++++++++++++++ 3 files changed, 216 insertions(+), 94 deletions(-) create mode 100644 spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/config/annotation/web/server/EnableHazelcastWebSession.java create mode 100644 spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/config/annotation/web/server/HazelcastWebSessionConfiguration.java diff --git a/spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/HazelcastReactiveSessionRepository.java b/spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/HazelcastReactiveSessionRepository.java index 81dda7585..f7b0b7558 100644 --- a/spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/HazelcastReactiveSessionRepository.java +++ b/spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/HazelcastReactiveSessionRepository.java @@ -24,16 +24,11 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import com.hazelcast.core.EntryEvent; import com.hazelcast.core.ExecutionCallback; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.ICompletableFuture; import com.hazelcast.core.IMap; -import com.hazelcast.map.listener.EntryAddedListener; -import com.hazelcast.map.listener.EntryEvictedListener; -import com.hazelcast.map.listener.EntryRemovedListener; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import reactor.core.CoreSubscriber; @@ -44,15 +39,9 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; -import org.springframework.context.ApplicationEvent; -import org.springframework.context.ApplicationEventPublisher; import org.springframework.session.MapSession; import org.springframework.session.ReactiveSessionRepository; import org.springframework.session.Session; -import org.springframework.session.events.AbstractSessionEvent; -import org.springframework.session.events.SessionCreatedEvent; -import org.springframework.session.events.SessionDeletedEvent; -import org.springframework.session.events.SessionExpiredEvent; import org.springframework.util.Assert; /** @@ -73,10 +62,6 @@ * new HazelcastSessionRepository(hazelcastInstance); * * - * In order to support finding sessions by principal name using - * {@link #findByIndexNameAndIndexValue(String, String)} method, custom configuration of - * {@code IMap} supplied to this implementation is required. - * * The following snippet demonstrates how to define required configuration using * programmatic Hazelcast Configuration: * @@ -95,16 +80,6 @@ * Hazelcast.newHazelcastInstance(config); * * - * This implementation listens for events on the Hazelcast-backed SessionRepository and - * translates those events into the corresponding Spring Session events. Publish the - * Spring Session events with the given {@link ApplicationEventPublisher}. - * - *
    - *
  • entryAdded - {@link SessionCreatedEvent}
  • - *
  • entryEvicted - {@link SessionExpiredEvent}
  • - *
  • entryRemoved - {@link SessionDeletedEvent}
  • - *
- * * @author Vedran Pavic * @author Tommy Ludwig * @author Mark Anderson @@ -112,9 +87,7 @@ * @since 1.3.0 */ public class HazelcastReactiveSessionRepository implements - ReactiveSessionRepository, - EntryAddedListener, EntryEvictedListener, - EntryRemovedListener { + ReactiveSessionRepository { /** * The default name of map used by Spring Session to store sessions. @@ -125,18 +98,6 @@ public class HazelcastReactiveSessionRepository implements private final HazelcastInstance hazelcastInstance; - private ApplicationEventPublisher eventPublisher = new ApplicationEventPublisher() { - - @Override - public void publishEvent(ApplicationEvent event) { - } - - @Override - public void publishEvent(Object event) { - } - - }; - /** * If non-null, this value is used to override * {@link MapSession#setMaxInactiveInterval(Duration)}. @@ -149,8 +110,6 @@ public void publishEvent(Object event) { private IMap sessions; - private String sessionListenerId; - public HazelcastReactiveSessionRepository(HazelcastInstance hazelcastInstance) { Assert.notNull(hazelcastInstance, "HazelcastInstance must not be null"); this.hazelcastInstance = hazelcastInstance; @@ -159,27 +118,6 @@ public HazelcastReactiveSessionRepository(HazelcastInstance hazelcastInstance) { @PostConstruct public void init() { this.sessions = this.hazelcastInstance.getMap(this.sessionMapName); - this.sessionListenerId = this.sessions.addEntryListener(this, true); - } - - @PreDestroy - public void close() { - this.sessions.removeEntryListener(this.sessionListenerId); - } - - /** - * Sets the {@link ApplicationEventPublisher} that is used to publish - * {@link AbstractSessionEvent session events}. The default is to not publish session - * events. - * - * @param applicationEventPublisher the {@link ApplicationEventPublisher} that is used - * to publish session events. Cannot be null. - */ - public void setApplicationEventPublisher( - ApplicationEventPublisher applicationEventPublisher) { - Assert.notNull(applicationEventPublisher, - "ApplicationEventPublisher cannot be null"); - this.eventPublisher = applicationEventPublisher; } /** @@ -271,37 +209,6 @@ public Mono deleteById(String id) { .then(); } - @Override - public void entryAdded(EntryEvent event) { - MapSession session = event.getValue(); - if (session.getId().equals(session.getOriginalId())) { - if (logger.isDebugEnabled()) { - logger.debug("Session created with id: " + session.getId()); - } - this.eventPublisher.publishEvent(new SessionCreatedEvent(this, session)); - } - } - - @Override - public void entryEvicted(EntryEvent event) { - if (logger.isDebugEnabled()) { - logger.debug("Session expired with id: " + event.getOldValue().getId()); - } - this.eventPublisher - .publishEvent(new SessionExpiredEvent(this, event.getOldValue())); - } - - @Override - public void entryRemoved(EntryEvent event) { - MapSession session = event.getOldValue(); - if (session != null) { - if (logger.isDebugEnabled()) { - logger.debug("Session deleted with id: " + session.getId()); - } - this.eventPublisher.publishEvent(new SessionDeletedEvent(this, session)); - } - } - /** * A custom implementation of {@link Session} that uses a {@link MapSession} as the * basis for its mapping. It keeps track if changes have been made since last save. diff --git a/spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/config/annotation/web/server/EnableHazelcastWebSession.java b/spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/config/annotation/web/server/EnableHazelcastWebSession.java new file mode 100644 index 000000000..64f932279 --- /dev/null +++ b/spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/config/annotation/web/server/EnableHazelcastWebSession.java @@ -0,0 +1,102 @@ +/* + * Copyright 2014-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.session.hazelcast.config.annotation.web.server; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import com.hazelcast.core.HazelcastInstance; + +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.session.MapSession; +import org.springframework.session.Session; +import org.springframework.session.SessionRepository; +import org.springframework.session.config.annotation.web.http.EnableSpringHttpSession; +import org.springframework.session.hazelcast.HazelcastFlushMode; +import org.springframework.session.hazelcast.HazelcastReactiveSessionRepository; +import org.springframework.session.hazelcast.HazelcastSessionRepository; +import org.springframework.session.web.http.SessionRepositoryFilter; + +/** + * Add this annotation to an {@code @Configuration} class to expose the + * {@link SessionRepositoryFilter} as a bean named {@code springSessionRepositoryFilter} + * and backed by Hazelcast. In order to leverage the annotation, a single + * {@link HazelcastInstance} must be provided. For example: + * + *
+ * @Configuration
+ * @EnableHazelcastHttpSession
+ * public class HazelcastHttpSessionConfig {
+ *
+ *     @Bean
+ *     public HazelcastInstance embeddedHazelcast() {
+ *         Config hazelcastConfig = new Config();
+ *         return Hazelcast.newHazelcastInstance(hazelcastConfig);
+ *     }
+ *
+ * }
+ * 
+ * + * More advanced configurations can extend {@link HazelcastWebSessionConfiguration} + * instead. + * + * @author Tommy Ludwig + * @author Aleksandar Stojsavljevic + * @author Vedran Pavic + * @since 1.1 + * @see EnableSpringHttpSession + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +@Documented +@Import(HazelcastWebSessionConfiguration.class) +@Configuration +public @interface EnableHazelcastWebSession { + + /** + * The session timeout in seconds. By default, it is set to 1800 seconds (30 minutes). + * This should be a non-negative integer. + * @return the seconds a session can be inactive before expiring + */ + int maxInactiveIntervalInSeconds() default MapSession.DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS; + + /** + * This is the name of the Map that will be used in Hazelcast to store the session + * data. Default is + * {@link HazelcastSessionRepository#DEFAULT_SESSION_MAP_NAME}. + * @return the name of the Map to store the sessions in Hazelcast + */ + String sessionMapName() default HazelcastReactiveSessionRepository.DEFAULT_SESSION_MAP_NAME; + + /** + * Flush mode for the Hazelcast sessions. The default is {@code ON_SAVE} which only + * updates the backing Hazelcast when {@link SessionRepository#save(Session)} is + * invoked. In a web environment this happens just before the HTTP response is + * committed. + *

+ * Setting the value to {@code IMMEDIATE} will ensure that the any updates to the + * Session are immediately written to the Hazelcast instance. + * @return the {@link HazelcastFlushMode} to use + * @since 1.3.0 + */ + HazelcastFlushMode hazelcastFlushMode() default HazelcastFlushMode.ON_SAVE; + +} diff --git a/spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/config/annotation/web/server/HazelcastWebSessionConfiguration.java b/spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/config/annotation/web/server/HazelcastWebSessionConfiguration.java new file mode 100644 index 000000000..5ea30d589 --- /dev/null +++ b/spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/config/annotation/web/server/HazelcastWebSessionConfiguration.java @@ -0,0 +1,113 @@ +/* + * Copyright 2014-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.session.hazelcast.config.annotation.web.server; + +import java.util.Map; + +import com.hazelcast.core.HazelcastInstance; + +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.ImportAware; +import org.springframework.core.annotation.AnnotationAttributes; +import org.springframework.core.type.AnnotationMetadata; +import org.springframework.session.MapSession; +import org.springframework.session.config.annotation.web.server.SpringWebSessionConfiguration; +import org.springframework.session.hazelcast.HazelcastFlushMode; +import org.springframework.session.hazelcast.HazelcastReactiveSessionRepository; +import org.springframework.session.hazelcast.HazelcastSessionRepository; +import org.springframework.session.hazelcast.config.annotation.SpringSessionHazelcastInstance; +import org.springframework.session.web.http.SessionRepositoryFilter; +import org.springframework.util.StringUtils; + +/** + * Exposes the {@link SessionRepositoryFilter} as a bean named + * {@code springSessionRepositoryFilter}. In order to use this a single + * {@link HazelcastInstance} must be exposed as a Bean. + * + * @author Tommy Ludwig + * @author Vedran Pavic + * @since 1.1 + * @see EnableHazelcastWebSession + */ +@Configuration +public class HazelcastWebSessionConfiguration extends SpringWebSessionConfiguration + implements ImportAware { + + private Integer maxInactiveIntervalInSeconds = MapSession.DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS; + + private String sessionMapName = HazelcastSessionRepository.DEFAULT_SESSION_MAP_NAME; + + private HazelcastFlushMode hazelcastFlushMode = HazelcastFlushMode.ON_SAVE; + + private HazelcastInstance hazelcastInstance; + + @Bean + public HazelcastReactiveSessionRepository reactiveSessionRepository() { + HazelcastReactiveSessionRepository reactiveSessionRepository = new HazelcastReactiveSessionRepository( + this.hazelcastInstance); + if (StringUtils.hasText(this.sessionMapName)) { + reactiveSessionRepository.setSessionMapName(this.sessionMapName); + } + reactiveSessionRepository + .setDefaultMaxInactiveInterval(this.maxInactiveIntervalInSeconds); + reactiveSessionRepository.setHazelcastFlushMode(this.hazelcastFlushMode); + return reactiveSessionRepository; + } + + public void setMaxInactiveIntervalInSeconds(int maxInactiveIntervalInSeconds) { + this.maxInactiveIntervalInSeconds = maxInactiveIntervalInSeconds; + } + + public void setSessionMapName(String sessionMapName) { + this.sessionMapName = sessionMapName; + } + + public void setHazelcastFlushMode(HazelcastFlushMode hazelcastFlushMode) { + this.hazelcastFlushMode = hazelcastFlushMode; + } + + @Autowired + public void setHazelcastInstance( + @SpringSessionHazelcastInstance ObjectProvider springSessionHazelcastInstance, + ObjectProvider hazelcastInstance) { + HazelcastInstance hazelcastInstanceToUse = springSessionHazelcastInstance + .getIfAvailable(); + if (hazelcastInstanceToUse == null) { + hazelcastInstanceToUse = hazelcastInstance.getObject(); + } + this.hazelcastInstance = hazelcastInstanceToUse; + } + + @Override + public void setImportMetadata(AnnotationMetadata importMetadata) { + Map attributeMap = importMetadata + .getAnnotationAttributes(EnableHazelcastWebSession.class.getName()); + AnnotationAttributes attributes = AnnotationAttributes.fromMap(attributeMap); + this.maxInactiveIntervalInSeconds = + attributes.getNumber("maxInactiveIntervalInSeconds"); + String sessionMapNameValue = attributes.getString("sessionMapName"); + if (StringUtils.hasText(sessionMapNameValue)) { + this.sessionMapName = sessionMapNameValue; + } + this.hazelcastFlushMode = attributes.getEnum("hazelcastFlushMode"); + } + +}