diff --git a/.gitignore b/.gitignore
index 20502b0..259bf26 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,3 +7,5 @@ release.properties
pom.xml.releaseBackup
.DS_Store
pom.xml.asc
+.gradle
+/build
diff --git a/build.gradle b/build.gradle
new file mode 100644
index 0000000..c797b05
--- /dev/null
+++ b/build.gradle
@@ -0,0 +1,21 @@
+apply plugin: 'java'
+apply plugin: 'maven'
+
+group = GROUP
+version = VERSION
+
+description = "SSE EventSource Java Client"
+
+sourceCompatibility = 1.7
+targetCompatibility = 1.7
+
+repositories {
+ maven { url "http://repo.maven.apache.org/maven2" }
+}
+dependencies {
+ compile group: 'org.slf4j', name: 'slf4j-api', version:'1.7.6'
+ compile group: 'org.jboss.netty', name: 'netty', version:'3.2.4.Final'
+ testCompile group: 'org.webbitserver', name: 'webbit', version:'0.1.13'
+ testCompile group: 'junit', name: 'junit', version:'4.8.2'
+ testCompile group: 'org.mockito', name: 'mockito-all', version:'1.8.5'
+}
diff --git a/eventsource-client.iml b/eventsource-client.iml
deleted file mode 100644
index cd369a0..0000000
--- a/eventsource-client.iml
+++ /dev/null
@@ -1,25 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/eventsource-client.ipr b/eventsource-client.ipr
deleted file mode 100644
index 95c6fbf..0000000
--- a/eventsource-client.ipr
+++ /dev/null
@@ -1,294 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- -
-
-
- -
-
-
- -
-
-
- -
-
-
- -
-
-
-
-
-
- -
-
-
-
-
-
- -
-
-
-
-
-
- -
-
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
-
-
- -
-
-
- -
-
-
- -
-
-
- -
-
-
- -
-
-
-
-
- -
-
-
- -
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- http://www.w3.org/1999/xhtml
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/gradle.properties b/gradle.properties
new file mode 100644
index 0000000..c0235fb
--- /dev/null
+++ b/gradle.properties
@@ -0,0 +1,22 @@
+# Project-wide Gradle settings.
+
+# IDE (e.g. Android Studio) users:
+# Settings specified in this file will override any Gradle settings
+# configured through the IDE.
+
+# For more details on how to configure your build environment visit
+# http://www.gradle.org/docs/current/userguide/build_environment.html
+
+# Specifies the JVM arguments used for the daemon process.
+# The setting is particularly useful for tweaking memory settings.
+# Default value: -Xmx10248m -XX:MaxPermSize=256m
+# org.gradle.jvmargs=-Xmx2048m -XX:MaxPermSize=512m -XX:+HeapDumpOnOutOfMemoryError -Dfile.encoding=UTF-8
+
+# When configured, Gradle will run in incubating parallel mode.
+# This option should only be used with decoupled projects. More details, visit
+# http://www.gradle.org/docs/current/userguide/multi_project_builds.html#sec:decoupled_projects
+# org.gradle.parallel=true
+
+GROUP=com.github.flocsy
+VERSION=0.2-SNAPSHOT
+ARTIFACT_ID=eventsource-client
diff --git a/pom.xml b/pom.xml
index 1f60e61..c6d35e0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,17 +1,12 @@
4.0.0
- com.github.aslakhellesoy
+ com.github.flocsy
eventsource-client
${project.artifactId}
A Java EventSource Client
- http://aslakhellesoy.github.com/eventsource-java
- 0.1.2.1
+ http://github.com/flocsy/eventsource-java
+ 0.2-SNAPSHOT
jar
-
- org.sonatype.oss
- oss-parent
- 6
-
BSD License
@@ -19,18 +14,13 @@
repo
-
- scm:git:git://github.com/aslakhellesoy/eventsource-java.git
- scm:git:git@github.com:aslakhellesoy/eventsource-java.git
- git://github.com/aslakhellesoy/eventsource-java.git
-
-
-
- repository.jboss.org
- http://repository.jboss.org/nexus/content/groups/public/
-
-
+
+ org.slf4j
+ slf4j-api
+ 1.7.6
+ compile
+
org.jboss.netty
netty
@@ -67,23 +57,18 @@
1.6
-
- org.apache.maven.plugins
- maven-gpg-plugin
- 1.1
-
- true
-
-
-
- sign-artifacts
- verify
-
- sign
-
-
-
-
+
+
+ nx-releases
+ Nexus Releases
+ ${nexus.releases.url}
+
+
+ nx-snapshots
+ Nexus Snapshots
+ ${nexus.snapshots.url}
+
+
diff --git a/src/main/java/com/github/eventsource/client/EventSource.java b/src/main/java/com/github/eventsource/client/EventSource.java
index 17d8cce..c7d9665 100644
--- a/src/main/java/com/github/eventsource/client/EventSource.java
+++ b/src/main/java/com/github/eventsource/client/EventSource.java
@@ -12,13 +12,15 @@
import org.jboss.netty.handler.codec.frame.Delimiters;
import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
import org.jboss.netty.handler.codec.string.StringDecoder;
+import org.jboss.netty.handler.ssl.SslHandler;
+import javax.net.ssl.SSLEngine;
import java.net.InetSocketAddress;
import java.net.URI;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-public class EventSource {
+public class EventSource implements EventSourceHandler {
public static final long DEFAULT_RECONNECTION_TIME_MILLIS = 2000;
public static final int CONNECTING = 0;
@@ -27,7 +29,9 @@ public class EventSource {
private final ClientBootstrap bootstrap;
private final EventSourceChannelHandler clientHandler;
+ private final EventSourceHandler eventSourceHandler;
+ private URI uri;
private int readyState;
/**
@@ -43,18 +47,37 @@ public class EventSource {
* @param eventSourceHandler receives events
* @see #close()
*/
- public EventSource(Executor executor, long reconnectionTimeMillis, final URI uri, EventSourceHandler eventSourceHandler) {
+ protected EventSource(ExecutorService executor, long reconnectionTimeMillis, final URI uri, EventSourceHandler eventSourceHandler) {
+ this(executor, reconnectionTimeMillis, uri, null, eventSourceHandler);
+ }
+
+ protected EventSource(ExecutorService executor, long reconnectionTimeMillis, final URI uri, final SSLEngineProvider sslEngineProvider, EventSourceHandler eventSourceHandler) {
+ this.eventSourceHandler = eventSourceHandler;
+
bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newSingleThreadExecutor(),
Executors.newSingleThreadExecutor()));
- bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), uri.getPort()));
+ this.uri = uri;
+
+ bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), getPort(uri)));
+
+// clientHandler = new EventSourceChannelHandler(new AsyncEventSourceHandler(executor, eventSourceHandler), reconnectionTimeMillis, bootstrap, uri);
+ // add this class as the event source handler so the connect() call can be intercepted
+ AsyncEventSourceHandler asyncHandler = new AsyncEventSourceHandler(executor, this);
- clientHandler = new EventSourceChannelHandler(new AsyncEventSourceHandler(executor, eventSourceHandler), reconnectionTimeMillis, bootstrap, uri);
+ clientHandler = new EventSourceChannelHandler(asyncHandler, reconnectionTimeMillis, bootstrap, uri);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
+
+ if (uri.getScheme().equals("https") && sslEngineProvider != null) {
+ SSLEngine engine = sslEngineProvider.createSSLEngine();
+ engine.setUseClientMode(true);
+ pipeline.addLast("ssl", new SslHandler(engine));
+ }
+
pipeline.addLast("line", new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter()));
pipeline.addLast("string", new StringDecoder());
@@ -65,25 +88,56 @@ public ChannelPipeline getPipeline() throws Exception {
});
}
- public EventSource(String uri, EventSourceHandler eventSourceHandler) {
- this(URI.create(uri), eventSourceHandler);
+ protected EventSource(String uri, EventSourceHandler eventSourceHandler) {
+ this(uri, null, eventSourceHandler);
+ }
+
+ protected EventSource(String uri, SSLEngineProvider sslEngineProvider, EventSourceHandler eventSourceHandler) {
+ this(URI.create(uri), sslEngineProvider, eventSourceHandler);
}
- public EventSource(URI uri, EventSourceHandler eventSourceHandler) {
- this(Executors.newSingleThreadExecutor(), DEFAULT_RECONNECTION_TIME_MILLIS, uri, eventSourceHandler);
+ protected EventSource(URI uri, EventSourceHandler eventSourceHandler) {
+ this(uri, null, eventSourceHandler);
+ }
+
+ protected EventSource(URI uri, SSLEngineProvider sslEngineProvider, EventSourceHandler eventSourceHandler) {
+ this(Executors.newSingleThreadExecutor(), DEFAULT_RECONNECTION_TIME_MILLIS, uri, sslEngineProvider, eventSourceHandler);
+ }
+
+ /**
+ * Sets a custom HTTP header that will be used when the request is made to establish the SSE channel.
+ *
+ * @param name the HTTP header name
+ * @param value the header value
+ */
+ public void setCustomRequestHeader(String name, String value) {
+ clientHandler.setCustomRequestHeader(name, value);
}
public ChannelFuture connect() {
readyState = CONNECTING;
+
+ //To avoid perpetual "SocketUnresolvedException"
+ bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), getPort(uri)));
+
return bootstrap.connect();
}
+ public boolean isConnected() {
+ return (readyState == OPEN);
+ }
+
+ public int getReadyState() {
+ return readyState;
+ }
+
/**
* Close the connection
*
* @return self
*/
public EventSource close() {
+ readyState = CLOSED;
clientHandler.close();
return this;
}
@@ -98,4 +152,38 @@ public EventSource join() throws InterruptedException {
clientHandler.join();
return this;
}
+
+ @Override
+ public void onConnect() throws Exception {
+ // flag the connection as open
+ readyState = OPEN;
+
+ // pass event to the proper handler
+ eventSourceHandler.onConnect();
+ }
+
+ @Override
+ public void onMessage(String event, MessageEvent message) throws Exception {
+ // pass event to the proper handler
+ eventSourceHandler.onMessage(event, message);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ // pass event to the proper handler
+ eventSourceHandler.onError(t);
+ }
+
+ @Override
+ public void onClosed(boolean willReconnect) {
+ eventSourceHandler.onClosed(willReconnect);
+ }
+
+ static public int getPort(URI uri) {
+ int port = uri.getPort();
+ if (port == -1) {
+ port = (uri.getScheme().equals("https")) ? 443 : 80;
+ }
+ return port;
+ }
}
diff --git a/src/main/java/com/github/eventsource/client/EventSourceHandler.java b/src/main/java/com/github/eventsource/client/EventSourceHandler.java
index b50b3d0..2ec35bf 100644
--- a/src/main/java/com/github/eventsource/client/EventSourceHandler.java
+++ b/src/main/java/com/github/eventsource/client/EventSourceHandler.java
@@ -4,4 +4,5 @@ public interface EventSourceHandler {
void onConnect() throws Exception;
void onMessage(String event, MessageEvent message) throws Exception;
void onError(Throwable t);
+ void onClosed(boolean willReconnect);
}
diff --git a/src/main/java/com/github/eventsource/client/SSLEngineProvider.java b/src/main/java/com/github/eventsource/client/SSLEngineProvider.java
new file mode 100644
index 0000000..9f6ac28
--- /dev/null
+++ b/src/main/java/com/github/eventsource/client/SSLEngineProvider.java
@@ -0,0 +1,7 @@
+package com.github.eventsource.client;
+
+import javax.net.ssl.SSLEngine;
+
+public interface SSLEngineProvider {
+ SSLEngine createSSLEngine();
+}
\ No newline at end of file
diff --git a/src/main/java/com/github/eventsource/client/StoppableEventSource.java b/src/main/java/com/github/eventsource/client/StoppableEventSource.java
new file mode 100644
index 0000000..bf149a0
--- /dev/null
+++ b/src/main/java/com/github/eventsource/client/StoppableEventSource.java
@@ -0,0 +1,78 @@
+package com.github.eventsource.client;
+
+import java.net.URI;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class StoppableEventSource extends EventSource {
+ static final String TAG = "StoppableEventSource";
+
+ private final ExecutorService executorService;
+
+// private StoppableEventSource(Executor executor, long reconnectionTimeMillis, final URI uri, final SSLEngineProvider sslEngineProvider, EventSourceHandler eventSourceHandler) {
+// super(executor, reconnectionTimeMillis, uri, sslEngineProvider, eventSourceHandler);
+// }
+//
+// private StoppableEventSource(Executor executor, long reconnectionTimeMillis, final URI uri, EventSourceHandler eventSourceHandler) {
+// this(executor, reconnectionTimeMillis, uri, null, eventSourceHandler);
+// }
+
+ private StoppableEventSource(ExecutorService executorService, long reconnectionTimeMillis, final URI uri, final SSLEngineProvider sslEngineProvider, EventSourceHandler eventSourceHandler) {
+ super(executorService, reconnectionTimeMillis, uri, sslEngineProvider, eventSourceHandler);
+ this.executorService = executorService;
+ }
+
+ public StoppableEventSource(URI uri, SSLEngineProvider sslEngineProvider, EventSourceHandler eventSourceHandler) {
+ this(Executors.newSingleThreadExecutor(), DEFAULT_RECONNECTION_TIME_MILLIS, uri, sslEngineProvider, eventSourceHandler);
+ }
+
+ public StoppableEventSource(String uri, SSLEngineProvider sslEngineProvider, EventSourceHandler eventSourceHandler) {
+ this(URI.create(uri), sslEngineProvider, eventSourceHandler);
+ }
+
+ public StoppableEventSource(String uri, EventSourceHandler eventSourceHandler) {
+ this(uri, null, eventSourceHandler);
+ }
+
+ public StoppableEventSource(URI uri, EventSourceHandler eventSourceHandler) {
+ this(uri, null, eventSourceHandler);
+ }
+
+ @Override
+ protected void finalize() {
+ System.out.println("finalize");
+ }
+
+ public void shutdownNow() {
+ shutdownAndAwaitTermination(1, TimeUnit.SECONDS);
+ }
+
+ public void shutdownAndAwaitTermination(final long timeout, final TimeUnit unit) {
+ close();
+ executorService.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!executorService.awaitTermination(timeout, unit)) {
+ executorService.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!executorService.awaitTermination(timeout, unit)) {
+ System.err.println("Pool did not terminate");
+ }
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ executorService.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public boolean isShutdown() {
+ return executorService.isShutdown();
+ }
+
+ public boolean isTerminated() {
+ return executorService.isTerminated();
+ }
+}
diff --git a/src/main/java/com/github/eventsource/client/impl/AsyncEventSourceHandler.java b/src/main/java/com/github/eventsource/client/impl/AsyncEventSourceHandler.java
index 89864f2..1d7297c 100644
--- a/src/main/java/com/github/eventsource/client/impl/AsyncEventSourceHandler.java
+++ b/src/main/java/com/github/eventsource/client/impl/AsyncEventSourceHandler.java
@@ -3,56 +3,78 @@
import com.github.eventsource.client.EventSourceHandler;
import com.github.eventsource.client.MessageEvent;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
public class AsyncEventSourceHandler implements EventSourceHandler {
- private final Executor executor;
+ private final ExecutorService executor;
private final EventSourceHandler eventSourceHandler;
- public AsyncEventSourceHandler(Executor executor, EventSourceHandler eventSourceHandler) {
+ public AsyncEventSourceHandler(ExecutorService executor, EventSourceHandler eventSourceHandler) {
this.executor = executor;
this.eventSourceHandler = eventSourceHandler;
}
@Override
public void onConnect() {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- eventSourceHandler.onConnect();
- } catch (Exception e) {
- onError(e);
+ if (!executor.isShutdown()) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ eventSourceHandler.onConnect();
+ } catch (Exception e) {
+ onError(e);
+ }
}
- }
- });
+ });
+ }
}
@Override
public void onMessage(final String event, final MessageEvent message) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- eventSourceHandler.onMessage(event, message);
- } catch (Exception e) {
- onError(e);
+ if (!executor.isShutdown()) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ eventSourceHandler.onMessage(event, message);
+ } catch (Exception e) {
+ onError(e);
+ }
}
- }
- });
+ });
+ }
}
@Override
public void onError(final Throwable error) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- eventSourceHandler.onError(error);
- } catch (Throwable e) {
- e.printStackTrace();
+ if (!executor.isShutdown()) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ eventSourceHandler.onError(error);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
}
- }
- });
+ });
+ }
+ }
+
+ @Override
+ public void onClosed(final boolean willReconnect) {
+ if (!executor.isShutdown()) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ eventSourceHandler.onClosed(willReconnect);
+ } catch (Exception e) {
+ onError(e);
+ }
+ }
+ });
+ }
}
}
diff --git a/src/main/java/com/github/eventsource/client/impl/EventStreamParser.java b/src/main/java/com/github/eventsource/client/impl/EventStreamParser.java
index e651eec..90d41d2 100644
--- a/src/main/java/com/github/eventsource/client/impl/EventStreamParser.java
+++ b/src/main/java/com/github/eventsource/client/impl/EventStreamParser.java
@@ -3,7 +3,6 @@
import com.github.eventsource.client.EventSourceHandler;
import com.github.eventsource.client.MessageEvent;
-import java.io.StringReader;
import java.util.regex.Pattern;
/**
diff --git a/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java b/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java
index 6ad7d7b..53f8b5d 100644
--- a/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java
+++ b/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java
@@ -1,11 +1,20 @@
package com.github.eventsource.client.impl.netty;
+import com.github.eventsource.client.EventSource;
import com.github.eventsource.client.EventSourceException;
import com.github.eventsource.client.EventSourceHandler;
import com.github.eventsource.client.impl.ConnectionHandler;
import com.github.eventsource.client.impl.EventStreamParser;
import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.*;
+
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+
import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
import org.jboss.netty.handler.codec.http.HttpHeaders.Names;
import org.jboss.netty.handler.codec.http.HttpMethod;
@@ -17,7 +26,10 @@
import org.jboss.netty.util.TimerTask;
import java.net.ConnectException;
+import java.net.InetSocketAddress;
import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
@@ -25,12 +37,13 @@
public class EventSourceChannelHandler extends SimpleChannelUpstreamHandler implements ConnectionHandler {
private static final Pattern STATUS_PATTERN = Pattern.compile("HTTP/1.1 (\\d+) (.*)");
- private static final Pattern CONTENT_TYPE_PATTERN = Pattern.compile("Content-Type: text/event-stream");
+ private static final Pattern CONTENT_TYPE_PATTERN = Pattern.compile("Content-Type: text/event-stream(;.*)?", Pattern.CASE_INSENSITIVE);
private final EventSourceHandler eventSourceHandler;
private final ClientBootstrap bootstrap;
private final URI uri;
private final EventStreamParser messageDispatcher;
+ private final Map customRequestHeaders = new HashMap();
private final Timer timer = new HashedWheelTimer();
private Channel channel;
@@ -57,14 +70,23 @@ public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exc
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toString());
+ final String query = uri.getQuery();
+ final String path = uri.getPath() + (((null != query) && !query.isEmpty()) ? "?" + query : "");
+ final int port = uri.getPort();
+ final String portPostfix = ((port != -1) ? ":" + port : "");
+ final String host = uri.getHost() + portPostfix;
+ final HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path);
request.addHeader(Names.ACCEPT, "text/event-stream");
- request.addHeader(Names.HOST, uri.getHost());
- request.addHeader(Names.ORIGIN, "http://" + uri.getHost());
+ request.addHeader(Names.HOST, host);
+ request.addHeader(Names.ORIGIN, uri.getScheme() + "://" + host);
request.addHeader(Names.CACHE_CONTROL, "no-cache");
if (lastEventId != null) {
request.addHeader("Last-Event-ID", lastEventId);
}
+ // add any custom headers that have been set
+ for (String name : customRequestHeaders.keySet()) {
+ request.addHeader(name, customRequestHeaders.get(name));
+ }
e.getChannel().write(request);
channel = e.getChannel();
}
@@ -76,6 +98,10 @@ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ if (eventStreamOk) {
+ // call onClosed only if it was successfully opened (and onConnect was called)
+ eventSourceHandler.onClosed(reconnectOnClose);
+ }
if (reconnectOnClose) {
reconnect();
}
@@ -135,7 +161,7 @@ public void setLastEventId(String lastEventId) {
this.lastEventId = lastEventId;
}
- public EventSourceChannelHandler close() {
+ public synchronized EventSourceChannelHandler close() {
reconnectOnClose = false;
if (channel != null) {
channel.close();
@@ -150,16 +176,28 @@ public EventSourceChannelHandler join() throws InterruptedException {
return this;
}
+ /**
+ * Sets a custom HTTP header that will be used when the request is made to establish the SSE channel.
+ *
+ * @param name the HTTP header name
+ * @param value the header value
+ */
+ public void setCustomRequestHeader(String name, String value) {
+ customRequestHeaders.put(name, value);
+ }
+
private void reconnect() {
- if(!reconnecting.get()) {
- reconnecting.set(true);
+ if (reconnecting.compareAndSet(false, true)) {
+ headerDone = false;
+ eventStreamOk = false;
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
reconnecting.set(false);
+ bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), EventSource.getPort(uri)));
bootstrap.connect().await();
}
}, reconnectionTimeMillis, TimeUnit.MILLISECONDS);
}
}
-}
\ No newline at end of file
+}
diff --git a/src/main/java/com/github/eventsource/client/stubs/StubHandler.java b/src/main/java/com/github/eventsource/client/stubs/StubHandler.java
index 5977afb..01856db 100644
--- a/src/main/java/com/github/eventsource/client/stubs/StubHandler.java
+++ b/src/main/java/com/github/eventsource/client/stubs/StubHandler.java
@@ -32,6 +32,11 @@ public void onConnect() throws Exception {
connected = true;
}
+ @Override
+ public void onClosed(boolean willReconnect){
+ connected = false;
+ }
+
@Override
public void onMessage(String event, MessageEvent message) throws Exception {
getMessageEvents(event).add(message);
diff --git a/src/test/java/com/github/eventsource/client/DebugClient.java b/src/test/java/com/github/eventsource/client/DebugClient.java
index 3c11876..5ce75fb 100644
--- a/src/test/java/com/github/eventsource/client/DebugClient.java
+++ b/src/test/java/com/github/eventsource/client/DebugClient.java
@@ -21,6 +21,11 @@ public void onError(Throwable t) {
System.err.println("ERROR");
t.printStackTrace();
}
+
+ @Override
+ public void onClosed(boolean willReconnect) {
+ System.err.println("CLOSED");
+ }
});
es.connect();
diff --git a/src/test/java/com/github/eventsource/client/EventSourceClientTest.java b/src/test/java/com/github/eventsource/client/EventSourceClientTest.java
index 02fb54a..27c44d9 100644
--- a/src/test/java/com/github/eventsource/client/EventSourceClientTest.java
+++ b/src/test/java/com/github/eventsource/client/EventSourceClientTest.java
@@ -105,6 +105,11 @@ public void onError(Throwable t) {
System.out.println("ERROR: " + t);
errorCountdown.countDown();
}
+
+ @Override
+ public void onClosed(boolean willReconnect) {
+ System.out.println("CLOSED");
+ }
});
eventSource.connect();
@@ -147,6 +152,10 @@ public void onMessage(String event, com.github.eventsource.client.MessageEvent m
public void onError(Throwable t) {
errorCountdown.countDown();
}
+
+ @Override
+ public void onClosed(boolean willReconnect) {
+ }
});
eventSource.connect().await();
}