+ sds = new Operators.MonoSubscriber<>(actual);
+
+ actual.onSubscribe(sds);
+
+ if (sds.isCancelled()) {
+ return;
+ }
+
+ future.andThen(new ExecutionCallback() {
+ public void onResponse(Object v) {
+ try {
+ if (v != null) {
+ sds.complete((T) v);
+ }
+ else {
+ actual.onComplete();
+ }
+ }
+ catch (Throwable e1) {
+ Operators.onErrorDropped(e1, actual.currentContext());
+ throw Exceptions.bubble(e1);
+ }
+ }
+
+ public void onFailure(Throwable e) {
+ try {
+ if (e != null) {
+ actual.onError(e);
+ }
+ }
+ catch (Throwable e1) {
+ Operators.onErrorDropped(e1, actual.currentContext());
+ throw Exceptions.bubble(e1);
+ }
+ }
+ });
+ }
+
+ @Override
+ public Object scanUnsafe(Scannable.Attr key) {
+ return null; //no particular key to be represented, still useful in hooks
+ }
+ }
+
+}
diff --git a/spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/config/annotation/web/server/EnableHazelcastWebSession.java b/spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/config/annotation/web/server/EnableHazelcastWebSession.java
new file mode 100644
index 000000000..64f932279
--- /dev/null
+++ b/spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/config/annotation/web/server/EnableHazelcastWebSession.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2014-2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.session.hazelcast.config.annotation.web.server;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import com.hazelcast.core.HazelcastInstance;
+
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+import org.springframework.session.MapSession;
+import org.springframework.session.Session;
+import org.springframework.session.SessionRepository;
+import org.springframework.session.config.annotation.web.http.EnableSpringHttpSession;
+import org.springframework.session.hazelcast.HazelcastFlushMode;
+import org.springframework.session.hazelcast.HazelcastReactiveSessionRepository;
+import org.springframework.session.hazelcast.HazelcastSessionRepository;
+import org.springframework.session.web.http.SessionRepositoryFilter;
+
+/**
+ * Add this annotation to an {@code @Configuration} class to expose the
+ * {@link SessionRepositoryFilter} as a bean named {@code springSessionRepositoryFilter}
+ * and backed by Hazelcast. In order to leverage the annotation, a single
+ * {@link HazelcastInstance} must be provided. For example:
+ *
+ *
+ * @Configuration
+ * @EnableHazelcastHttpSession
+ * public class HazelcastHttpSessionConfig {
+ *
+ * @Bean
+ * public HazelcastInstance embeddedHazelcast() {
+ * Config hazelcastConfig = new Config();
+ * return Hazelcast.newHazelcastInstance(hazelcastConfig);
+ * }
+ *
+ * }
+ *
+ *
+ * More advanced configurations can extend {@link HazelcastWebSessionConfiguration}
+ * instead.
+ *
+ * @author Tommy Ludwig
+ * @author Aleksandar Stojsavljevic
+ * @author Vedran Pavic
+ * @since 1.1
+ * @see EnableSpringHttpSession
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+@Documented
+@Import(HazelcastWebSessionConfiguration.class)
+@Configuration
+public @interface EnableHazelcastWebSession {
+
+ /**
+ * The session timeout in seconds. By default, it is set to 1800 seconds (30 minutes).
+ * This should be a non-negative integer.
+ * @return the seconds a session can be inactive before expiring
+ */
+ int maxInactiveIntervalInSeconds() default MapSession.DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS;
+
+ /**
+ * This is the name of the Map that will be used in Hazelcast to store the session
+ * data. Default is
+ * {@link HazelcastSessionRepository#DEFAULT_SESSION_MAP_NAME}.
+ * @return the name of the Map to store the sessions in Hazelcast
+ */
+ String sessionMapName() default HazelcastReactiveSessionRepository.DEFAULT_SESSION_MAP_NAME;
+
+ /**
+ * Flush mode for the Hazelcast sessions. The default is {@code ON_SAVE} which only
+ * updates the backing Hazelcast when {@link SessionRepository#save(Session)} is
+ * invoked. In a web environment this happens just before the HTTP response is
+ * committed.
+ *
+ * Setting the value to {@code IMMEDIATE} will ensure that the any updates to the
+ * Session are immediately written to the Hazelcast instance.
+ * @return the {@link HazelcastFlushMode} to use
+ * @since 1.3.0
+ */
+ HazelcastFlushMode hazelcastFlushMode() default HazelcastFlushMode.ON_SAVE;
+
+}
diff --git a/spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/config/annotation/web/server/HazelcastWebSessionConfiguration.java b/spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/config/annotation/web/server/HazelcastWebSessionConfiguration.java
new file mode 100644
index 000000000..5ea30d589
--- /dev/null
+++ b/spring-session-hazelcast/src/main/java/org/springframework/session/hazelcast/config/annotation/web/server/HazelcastWebSessionConfiguration.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2014-2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.session.hazelcast.config.annotation.web.server;
+
+import java.util.Map;
+
+import com.hazelcast.core.HazelcastInstance;
+
+import org.springframework.beans.factory.ObjectProvider;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.ImportAware;
+import org.springframework.core.annotation.AnnotationAttributes;
+import org.springframework.core.type.AnnotationMetadata;
+import org.springframework.session.MapSession;
+import org.springframework.session.config.annotation.web.server.SpringWebSessionConfiguration;
+import org.springframework.session.hazelcast.HazelcastFlushMode;
+import org.springframework.session.hazelcast.HazelcastReactiveSessionRepository;
+import org.springframework.session.hazelcast.HazelcastSessionRepository;
+import org.springframework.session.hazelcast.config.annotation.SpringSessionHazelcastInstance;
+import org.springframework.session.web.http.SessionRepositoryFilter;
+import org.springframework.util.StringUtils;
+
+/**
+ * Exposes the {@link SessionRepositoryFilter} as a bean named
+ * {@code springSessionRepositoryFilter}. In order to use this a single
+ * {@link HazelcastInstance} must be exposed as a Bean.
+ *
+ * @author Tommy Ludwig
+ * @author Vedran Pavic
+ * @since 1.1
+ * @see EnableHazelcastWebSession
+ */
+@Configuration
+public class HazelcastWebSessionConfiguration extends SpringWebSessionConfiguration
+ implements ImportAware {
+
+ private Integer maxInactiveIntervalInSeconds = MapSession.DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS;
+
+ private String sessionMapName = HazelcastSessionRepository.DEFAULT_SESSION_MAP_NAME;
+
+ private HazelcastFlushMode hazelcastFlushMode = HazelcastFlushMode.ON_SAVE;
+
+ private HazelcastInstance hazelcastInstance;
+
+ @Bean
+ public HazelcastReactiveSessionRepository reactiveSessionRepository() {
+ HazelcastReactiveSessionRepository reactiveSessionRepository = new HazelcastReactiveSessionRepository(
+ this.hazelcastInstance);
+ if (StringUtils.hasText(this.sessionMapName)) {
+ reactiveSessionRepository.setSessionMapName(this.sessionMapName);
+ }
+ reactiveSessionRepository
+ .setDefaultMaxInactiveInterval(this.maxInactiveIntervalInSeconds);
+ reactiveSessionRepository.setHazelcastFlushMode(this.hazelcastFlushMode);
+ return reactiveSessionRepository;
+ }
+
+ public void setMaxInactiveIntervalInSeconds(int maxInactiveIntervalInSeconds) {
+ this.maxInactiveIntervalInSeconds = maxInactiveIntervalInSeconds;
+ }
+
+ public void setSessionMapName(String sessionMapName) {
+ this.sessionMapName = sessionMapName;
+ }
+
+ public void setHazelcastFlushMode(HazelcastFlushMode hazelcastFlushMode) {
+ this.hazelcastFlushMode = hazelcastFlushMode;
+ }
+
+ @Autowired
+ public void setHazelcastInstance(
+ @SpringSessionHazelcastInstance ObjectProvider springSessionHazelcastInstance,
+ ObjectProvider hazelcastInstance) {
+ HazelcastInstance hazelcastInstanceToUse = springSessionHazelcastInstance
+ .getIfAvailable();
+ if (hazelcastInstanceToUse == null) {
+ hazelcastInstanceToUse = hazelcastInstance.getObject();
+ }
+ this.hazelcastInstance = hazelcastInstanceToUse;
+ }
+
+ @Override
+ public void setImportMetadata(AnnotationMetadata importMetadata) {
+ Map attributeMap = importMetadata
+ .getAnnotationAttributes(EnableHazelcastWebSession.class.getName());
+ AnnotationAttributes attributes = AnnotationAttributes.fromMap(attributeMap);
+ this.maxInactiveIntervalInSeconds =
+ attributes.getNumber("maxInactiveIntervalInSeconds");
+ String sessionMapNameValue = attributes.getString("sessionMapName");
+ if (StringUtils.hasText(sessionMapNameValue)) {
+ this.sessionMapName = sessionMapNameValue;
+ }
+ this.hazelcastFlushMode = attributes.getEnum("hazelcastFlushMode");
+ }
+
+}
diff --git a/spring-session-hazelcast/src/test/java/org/springframework/session/hazelcast/HazelcastReactiveSessionRepositoryTests.java b/spring-session-hazelcast/src/test/java/org/springframework/session/hazelcast/HazelcastReactiveSessionRepositoryTests.java
new file mode 100644
index 000000000..dcff5cd3a
--- /dev/null
+++ b/spring-session-hazelcast/src/test/java/org/springframework/session/hazelcast/HazelcastReactiveSessionRepositoryTests.java
@@ -0,0 +1,381 @@
+/*
+ * Copyright 2014-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.session.hazelcast;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.ICompletableFuture;
+import com.hazelcast.core.IMap;
+import com.hazelcast.internal.serialization.impl.SerializationServiceV1;
+import com.hazelcast.map.EntryProcessor;
+import com.hazelcast.map.listener.MapListener;
+import com.hazelcast.util.executor.CompletedFuture;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.springframework.session.MapSession;
+import org.springframework.session.hazelcast.HazelcastReactiveSessionRepository.HazelcastSession;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+/**
+ * Tests for {@link HazelcastSessionRepository}.
+ *
+ * @author Vedran Pavic
+ * @author Aleksandar Stojsavljevic
+ */
+public class HazelcastReactiveSessionRepositoryTests {
+
+ private static final String SPRING_SECURITY_CONTEXT = "SPRING_SECURITY_CONTEXT";
+
+ private HazelcastInstance hazelcastInstance = mock(HazelcastInstance.class);
+
+ @SuppressWarnings("unchecked")
+ private IMap sessions = mock(IMap.class);
+
+ private HazelcastReactiveSessionRepository repository;
+
+ @Before
+ public void setUp() {
+ ICompletableFuture mockFuture = mock(ICompletableFuture.class);
+ given(sessions.setAsync(anyString(), any())).willReturn(mockFuture);
+ given(sessions.setAsync(anyString(), any(), anyLong(), any())).willReturn(mockFuture);
+ given(sessions.getAsync(anyString())).willReturn(mockFuture);
+ given(sessions.submitToKey(anyString(), any())).willReturn(mockFuture);
+ given(this.hazelcastInstance.getMap(anyString()))
+ .willReturn(this.sessions);
+ this.repository = new HazelcastReactiveSessionRepository(this.hazelcastInstance);
+ this.repository.init();
+ }
+
+ @Test
+ public void constructorNullHazelcastInstance() {
+ assertThatThrownBy(() -> new HazelcastSessionRepository(null))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("HazelcastInstance must not be null");
+ }
+
+ @Test
+ public void createSessionDefaultMaxInactiveInterval() {
+ verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
+ anyBoolean());
+
+ HazelcastSession session = this.repository.createSession().block();
+
+ assertThat(session.getMaxInactiveInterval())
+ .isEqualTo(new MapSession().getMaxInactiveInterval());
+ verifyZeroInteractions(this.sessions);
+ }
+
+ @Test
+ public void createSessionCustomMaxInactiveInterval() {
+ verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
+ anyBoolean());
+
+ int interval = 1;
+ this.repository.setDefaultMaxInactiveInterval(interval);
+
+ HazelcastSession session = this.repository.createSession().block();
+
+ assertThat(session.getMaxInactiveInterval())
+ .isEqualTo(Duration.ofSeconds(interval));
+ verifyZeroInteractions(this.sessions);
+ }
+
+ @Test
+ public void saveNewFlushModeOnSave() {
+ verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
+ anyBoolean());
+
+ HazelcastSession session = this.repository.createSession().block();
+ verifyZeroInteractions(this.sessions);
+
+ this.repository.save(session);
+ verify(this.sessions, times(1)).setAsync(eq(session.getId()),
+ eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS));
+ verifyZeroInteractions(this.sessions);
+ }
+
+ @Test
+ public void saveNewFlushModeImmediate() {
+ verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
+ anyBoolean());
+
+ this.repository.setHazelcastFlushMode(HazelcastFlushMode.IMMEDIATE);
+
+ HazelcastSession session = this.repository.createSession().block();
+ verify(this.sessions, times(1)).setAsync(eq(session.getId()),
+ eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS));
+ verifyZeroInteractions(this.sessions);
+ }
+
+ @Test
+ public void saveUpdatedAttributeFlushModeOnSave() {
+ verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
+ anyBoolean());
+
+ HazelcastSession session = this.repository.createSession().block();
+ session.setAttribute("testName", "testValue");
+ verifyZeroInteractions(this.sessions);
+
+ this.repository.save(session);
+ verify(this.sessions, times(1)).setAsync(eq(session.getId()),
+ eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS));
+ verifyZeroInteractions(this.sessions);
+ }
+
+ @Test
+ public void saveUpdatedAttributeFlushModeImmediate() {
+ verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
+ anyBoolean());
+
+ this.repository.setHazelcastFlushMode(HazelcastFlushMode.IMMEDIATE);
+
+ HazelcastSession session = this.repository.createSession().block();
+ session.setAttribute("testName", "testValue");
+ verify(this.sessions, times(1)).setAsync(eq(session.getId()),
+ eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS));
+ verify(this.sessions, times(1)).submitToKey(eq(session.getId()),
+ any(EntryProcessor.class));
+
+ this.repository.save(session);
+ verifyZeroInteractions(this.sessions);
+ }
+
+ @Test
+ public void removeAttributeFlushModeOnSave() {
+ verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
+ anyBoolean());
+
+ HazelcastSession session = this.repository.createSession().block();
+ session.removeAttribute("testName");
+ verifyZeroInteractions(this.sessions);
+
+ this.repository.save(session);
+ verify(this.sessions, times(1)).setAsync(eq(session.getId()),
+ eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS));
+ verifyZeroInteractions(this.sessions);
+ }
+
+ @Test
+ public void removeAttributeFlushModeImmediate() {
+ verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
+ anyBoolean());
+
+ this.repository.setHazelcastFlushMode(HazelcastFlushMode.IMMEDIATE);
+
+ HazelcastSession session = this.repository.createSession().block();
+ session.removeAttribute("testName");
+ verify(this.sessions, times(1)).setAsync(eq(session.getId()),
+ eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS));
+ verify(this.sessions, times(1)).submitToKey(eq(session.getId()),
+ any(EntryProcessor.class));
+
+ this.repository.save(session);
+ verifyZeroInteractions(this.sessions);
+ }
+
+ @Test
+ public void saveUpdatedLastAccessedTimeFlushModeOnSave() {
+ verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
+ anyBoolean());
+
+ HazelcastSession session = this.repository.createSession().block();
+ session.setLastAccessedTime(Instant.now());
+ verifyZeroInteractions(this.sessions);
+
+ this.repository.save(session);
+ verify(this.sessions, times(1)).setAsync(eq(session.getId()),
+ eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS));
+ verifyZeroInteractions(this.sessions);
+ }
+
+ @Test
+ public void saveUpdatedLastAccessedTimeFlushModeImmediate() {
+ verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
+ anyBoolean());
+
+ this.repository.setHazelcastFlushMode(HazelcastFlushMode.IMMEDIATE);
+
+ HazelcastSession session = this.repository.createSession().block();
+ session.setLastAccessedTime(Instant.now());
+ verify(this.sessions, times(1)).setAsync(eq(session.getId()),
+ eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS));
+ verify(this.sessions, times(1)).submitToKey(eq(session.getId()),
+ any(EntryProcessor.class));
+
+ this.repository.save(session);
+ verifyZeroInteractions(this.sessions);
+ }
+
+ @Test
+ public void saveUpdatedMaxInactiveIntervalInSecondsFlushModeOnSave() {
+ verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
+ anyBoolean());
+
+ HazelcastSession session = this.repository.createSession().block();
+ session.setMaxInactiveInterval(Duration.ofSeconds(1));
+ verifyZeroInteractions(this.sessions);
+
+ this.repository.save(session);
+ verify(this.sessions, times(1)).setAsync(eq(session.getId()),
+ eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS));
+ verifyZeroInteractions(this.sessions);
+ }
+
+ @Test
+ public void saveUpdatedMaxInactiveIntervalInSecondsFlushModeImmediate() {
+ verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
+ anyBoolean());
+
+ this.repository.setHazelcastFlushMode(HazelcastFlushMode.IMMEDIATE);
+
+ HazelcastSession session = this.repository.createSession().block();
+ session.setMaxInactiveInterval(Duration.ofSeconds(1));
+ verify(this.sessions, times(2)).setAsync(eq(session.getId()),
+ eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS));
+ verify(this.sessions, times(1)).submitToKey(eq(session.getId()),
+ any(EntryProcessor.class));
+
+ this.repository.save(session);
+ verifyZeroInteractions(this.sessions);
+ }
+
+ @Test
+ public void saveUnchangedFlushModeOnSave() {
+ verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
+ anyBoolean());
+
+ HazelcastSession session = this.repository.createSession().block();
+ this.repository.save(session);
+ verify(this.sessions, times(1)).setAsync(eq(session.getId()),
+ eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS));
+
+ this.repository.save(session);
+ verifyZeroInteractions(this.sessions);
+ }
+
+ @Test
+ public void saveUnchangedFlushModeImmediate() {
+ verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
+ anyBoolean());
+
+ this.repository.setHazelcastFlushMode(HazelcastFlushMode.IMMEDIATE);
+
+ HazelcastSession session = this.repository.createSession().block();
+ verify(this.sessions, times(1)).setAsync(eq(session.getId()),
+ eq(session.getDelegate()), isA(Long.class), eq(TimeUnit.SECONDS));
+
+ this.repository.save(session);
+ verifyZeroInteractions(this.sessions);
+ }
+
+ @Test
+ public void getSessionNotFound() {
+ verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
+ anyBoolean());
+
+ String sessionId = "testSessionId";
+
+ HazelcastSession session = this.repository.findById(sessionId).block();
+
+ assertThat(session).isNull();
+ verify(this.sessions, times(1)).getAsync(eq(sessionId));
+ verifyZeroInteractions(this.sessions);
+ }
+
+ @Test
+ public void getSessionExpired() {
+ verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
+ anyBoolean());
+
+ MapSession expired = new MapSession();
+ expired.setLastAccessedTime(Instant.now()
+ .minusSeconds(MapSession.DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS + 1));
+ given(this.sessions.getAsync(eq(expired.getId()))).willReturn(
+ new CompletedFuture<>(SerializationServiceV1.builder().build(), expired, Executors.newSingleThreadExecutor()));
+
+ HazelcastSession session = this.repository.findById(expired.getId()).block();
+
+ assertThat(session).isNull();
+ verify(this.sessions, times(1)).getAsync(eq(expired.getId()));
+ verify(this.sessions, times(1)).removeAsync(eq(expired.getId()));
+ verifyZeroInteractions(this.sessions);
+ }
+
+ @Test
+ public void getSessionFound() {
+ verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
+ anyBoolean());
+
+ MapSession saved = new MapSession();
+ saved.setAttribute("savedName", "savedValue");
+ given(this.sessions.getAsync(eq(saved.getId()))).willReturn(
+ new CompletedFuture<>(SerializationServiceV1.builder().build(), saved, Executors.newSingleThreadExecutor()));
+
+ HazelcastSession session = this.repository.findById(saved.getId()).block();
+
+ assertThat(session.getId()).isEqualTo(saved.getId());
+ assertThat(session.getAttribute("savedName")).isEqualTo("savedValue");
+ verify(this.sessions, times(1)).getAsync(eq(saved.getId()));
+ verifyZeroInteractions(this.sessions);
+ }
+
+ @Test
+ public void delete() {
+ verify(this.sessions, times(1)).addEntryListener(any(MapListener.class),
+ anyBoolean());
+
+ String sessionId = "testSessionId";
+
+ this.repository.deleteById(sessionId).block();
+
+ verify(this.sessions, times(1)).removeAsync(eq(sessionId));
+ verifyZeroInteractions(this.sessions);
+ }
+
+
+ @Test // gh-1120
+ public void getAttributeNamesAndRemove() {
+ HazelcastSession session = this.repository.createSession().block();
+ session.setAttribute("attribute1", "value1");
+ session.setAttribute("attribute2", "value2");
+
+ for (String attributeName : session.getAttributeNames()) {
+ session.removeAttribute(attributeName);
+ }
+
+ assertThat(session.getAttributeNames()).isEmpty();
+ }
+
+}