diff --git a/.gitignore b/.gitignore index 20502b0..259bf26 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ release.properties pom.xml.releaseBackup .DS_Store pom.xml.asc +.gradle +/build diff --git a/build.gradle b/build.gradle new file mode 100644 index 0000000..c797b05 --- /dev/null +++ b/build.gradle @@ -0,0 +1,21 @@ +apply plugin: 'java' +apply plugin: 'maven' + +group = GROUP +version = VERSION + +description = "SSE EventSource Java Client" + +sourceCompatibility = 1.7 +targetCompatibility = 1.7 + +repositories { + maven { url "http://repo.maven.apache.org/maven2" } +} +dependencies { + compile group: 'org.slf4j', name: 'slf4j-api', version:'1.7.6' + compile group: 'org.jboss.netty', name: 'netty', version:'3.2.4.Final' + testCompile group: 'org.webbitserver', name: 'webbit', version:'0.1.13' + testCompile group: 'junit', name: 'junit', version:'4.8.2' + testCompile group: 'org.mockito', name: 'mockito-all', version:'1.8.5' +} diff --git a/eventsource-client.iml b/eventsource-client.iml deleted file mode 100644 index cd369a0..0000000 --- a/eventsource-client.iml +++ /dev/null @@ -1,25 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/eventsource-client.ipr b/eventsource-client.ipr deleted file mode 100644 index 95c6fbf..0000000 --- a/eventsource-client.ipr +++ /dev/null @@ -1,294 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - http://www.w3.org/1999/xhtml - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/gradle.properties b/gradle.properties new file mode 100644 index 0000000..c0235fb --- /dev/null +++ b/gradle.properties @@ -0,0 +1,22 @@ +# Project-wide Gradle settings. + +# IDE (e.g. Android Studio) users: +# Settings specified in this file will override any Gradle settings +# configured through the IDE. + +# For more details on how to configure your build environment visit +# http://www.gradle.org/docs/current/userguide/build_environment.html + +# Specifies the JVM arguments used for the daemon process. +# The setting is particularly useful for tweaking memory settings. +# Default value: -Xmx10248m -XX:MaxPermSize=256m +# org.gradle.jvmargs=-Xmx2048m -XX:MaxPermSize=512m -XX:+HeapDumpOnOutOfMemoryError -Dfile.encoding=UTF-8 + +# When configured, Gradle will run in incubating parallel mode. +# This option should only be used with decoupled projects. More details, visit +# http://www.gradle.org/docs/current/userguide/multi_project_builds.html#sec:decoupled_projects +# org.gradle.parallel=true + +GROUP=com.github.flocsy +VERSION=0.2-SNAPSHOT +ARTIFACT_ID=eventsource-client diff --git a/pom.xml b/pom.xml index 1f60e61..c6d35e0 100644 --- a/pom.xml +++ b/pom.xml @@ -1,17 +1,12 @@ 4.0.0 - com.github.aslakhellesoy + com.github.flocsy eventsource-client ${project.artifactId} A Java EventSource Client - http://aslakhellesoy.github.com/eventsource-java - 0.1.2.1 + http://github.com/flocsy/eventsource-java + 0.2-SNAPSHOT jar - - org.sonatype.oss - oss-parent - 6 - BSD License @@ -19,18 +14,13 @@ repo - - scm:git:git://github.com/aslakhellesoy/eventsource-java.git - scm:git:git@github.com:aslakhellesoy/eventsource-java.git - git://github.com/aslakhellesoy/eventsource-java.git - - - - repository.jboss.org - http://repository.jboss.org/nexus/content/groups/public/ - - + + org.slf4j + slf4j-api + 1.7.6 + compile + org.jboss.netty netty @@ -67,23 +57,18 @@ 1.6 - - org.apache.maven.plugins - maven-gpg-plugin - 1.1 - - true - - - - sign-artifacts - verify - - sign - - - - + + + nx-releases + Nexus Releases + ${nexus.releases.url} + + + nx-snapshots + Nexus Snapshots + ${nexus.snapshots.url} + + diff --git a/src/main/java/com/github/eventsource/client/EventSource.java b/src/main/java/com/github/eventsource/client/EventSource.java index 17d8cce..c7d9665 100644 --- a/src/main/java/com/github/eventsource/client/EventSource.java +++ b/src/main/java/com/github/eventsource/client/EventSource.java @@ -12,13 +12,15 @@ import org.jboss.netty.handler.codec.frame.Delimiters; import org.jboss.netty.handler.codec.http.HttpRequestEncoder; import org.jboss.netty.handler.codec.string.StringDecoder; +import org.jboss.netty.handler.ssl.SslHandler; +import javax.net.ssl.SSLEngine; import java.net.InetSocketAddress; import java.net.URI; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -public class EventSource { +public class EventSource implements EventSourceHandler { public static final long DEFAULT_RECONNECTION_TIME_MILLIS = 2000; public static final int CONNECTING = 0; @@ -27,7 +29,9 @@ public class EventSource { private final ClientBootstrap bootstrap; private final EventSourceChannelHandler clientHandler; + private final EventSourceHandler eventSourceHandler; + private URI uri; private int readyState; /** @@ -43,18 +47,37 @@ public class EventSource { * @param eventSourceHandler receives events * @see #close() */ - public EventSource(Executor executor, long reconnectionTimeMillis, final URI uri, EventSourceHandler eventSourceHandler) { + protected EventSource(ExecutorService executor, long reconnectionTimeMillis, final URI uri, EventSourceHandler eventSourceHandler) { + this(executor, reconnectionTimeMillis, uri, null, eventSourceHandler); + } + + protected EventSource(ExecutorService executor, long reconnectionTimeMillis, final URI uri, final SSLEngineProvider sslEngineProvider, EventSourceHandler eventSourceHandler) { + this.eventSourceHandler = eventSourceHandler; + bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor())); - bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), uri.getPort())); + this.uri = uri; + + bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), getPort(uri))); + +// clientHandler = new EventSourceChannelHandler(new AsyncEventSourceHandler(executor, eventSourceHandler), reconnectionTimeMillis, bootstrap, uri); + // add this class as the event source handler so the connect() call can be intercepted + AsyncEventSourceHandler asyncHandler = new AsyncEventSourceHandler(executor, this); - clientHandler = new EventSourceChannelHandler(new AsyncEventSourceHandler(executor, eventSourceHandler), reconnectionTimeMillis, bootstrap, uri); + clientHandler = new EventSourceChannelHandler(asyncHandler, reconnectionTimeMillis, bootstrap, uri); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); + + if (uri.getScheme().equals("https") && sslEngineProvider != null) { + SSLEngine engine = sslEngineProvider.createSSLEngine(); + engine.setUseClientMode(true); + pipeline.addLast("ssl", new SslHandler(engine)); + } + pipeline.addLast("line", new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter())); pipeline.addLast("string", new StringDecoder()); @@ -65,25 +88,56 @@ public ChannelPipeline getPipeline() throws Exception { }); } - public EventSource(String uri, EventSourceHandler eventSourceHandler) { - this(URI.create(uri), eventSourceHandler); + protected EventSource(String uri, EventSourceHandler eventSourceHandler) { + this(uri, null, eventSourceHandler); + } + + protected EventSource(String uri, SSLEngineProvider sslEngineProvider, EventSourceHandler eventSourceHandler) { + this(URI.create(uri), sslEngineProvider, eventSourceHandler); } - public EventSource(URI uri, EventSourceHandler eventSourceHandler) { - this(Executors.newSingleThreadExecutor(), DEFAULT_RECONNECTION_TIME_MILLIS, uri, eventSourceHandler); + protected EventSource(URI uri, EventSourceHandler eventSourceHandler) { + this(uri, null, eventSourceHandler); + } + + protected EventSource(URI uri, SSLEngineProvider sslEngineProvider, EventSourceHandler eventSourceHandler) { + this(Executors.newSingleThreadExecutor(), DEFAULT_RECONNECTION_TIME_MILLIS, uri, sslEngineProvider, eventSourceHandler); + } + + /** + * Sets a custom HTTP header that will be used when the request is made to establish the SSE channel. + * + * @param name the HTTP header name + * @param value the header value + */ + public void setCustomRequestHeader(String name, String value) { + clientHandler.setCustomRequestHeader(name, value); } public ChannelFuture connect() { readyState = CONNECTING; + + //To avoid perpetual "SocketUnresolvedException" + bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), getPort(uri))); + return bootstrap.connect(); } + public boolean isConnected() { + return (readyState == OPEN); + } + + public int getReadyState() { + return readyState; + } + /** * Close the connection * * @return self */ public EventSource close() { + readyState = CLOSED; clientHandler.close(); return this; } @@ -98,4 +152,38 @@ public EventSource join() throws InterruptedException { clientHandler.join(); return this; } + + @Override + public void onConnect() throws Exception { + // flag the connection as open + readyState = OPEN; + + // pass event to the proper handler + eventSourceHandler.onConnect(); + } + + @Override + public void onMessage(String event, MessageEvent message) throws Exception { + // pass event to the proper handler + eventSourceHandler.onMessage(event, message); + } + + @Override + public void onError(Throwable t) { + // pass event to the proper handler + eventSourceHandler.onError(t); + } + + @Override + public void onClosed(boolean willReconnect) { + eventSourceHandler.onClosed(willReconnect); + } + + static public int getPort(URI uri) { + int port = uri.getPort(); + if (port == -1) { + port = (uri.getScheme().equals("https")) ? 443 : 80; + } + return port; + } } diff --git a/src/main/java/com/github/eventsource/client/EventSourceHandler.java b/src/main/java/com/github/eventsource/client/EventSourceHandler.java index b50b3d0..2ec35bf 100644 --- a/src/main/java/com/github/eventsource/client/EventSourceHandler.java +++ b/src/main/java/com/github/eventsource/client/EventSourceHandler.java @@ -4,4 +4,5 @@ public interface EventSourceHandler { void onConnect() throws Exception; void onMessage(String event, MessageEvent message) throws Exception; void onError(Throwable t); + void onClosed(boolean willReconnect); } diff --git a/src/main/java/com/github/eventsource/client/SSLEngineProvider.java b/src/main/java/com/github/eventsource/client/SSLEngineProvider.java new file mode 100644 index 0000000..9f6ac28 --- /dev/null +++ b/src/main/java/com/github/eventsource/client/SSLEngineProvider.java @@ -0,0 +1,7 @@ +package com.github.eventsource.client; + +import javax.net.ssl.SSLEngine; + +public interface SSLEngineProvider { + SSLEngine createSSLEngine(); +} \ No newline at end of file diff --git a/src/main/java/com/github/eventsource/client/StoppableEventSource.java b/src/main/java/com/github/eventsource/client/StoppableEventSource.java new file mode 100644 index 0000000..bf149a0 --- /dev/null +++ b/src/main/java/com/github/eventsource/client/StoppableEventSource.java @@ -0,0 +1,78 @@ +package com.github.eventsource.client; + +import java.net.URI; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class StoppableEventSource extends EventSource { + static final String TAG = "StoppableEventSource"; + + private final ExecutorService executorService; + +// private StoppableEventSource(Executor executor, long reconnectionTimeMillis, final URI uri, final SSLEngineProvider sslEngineProvider, EventSourceHandler eventSourceHandler) { +// super(executor, reconnectionTimeMillis, uri, sslEngineProvider, eventSourceHandler); +// } +// +// private StoppableEventSource(Executor executor, long reconnectionTimeMillis, final URI uri, EventSourceHandler eventSourceHandler) { +// this(executor, reconnectionTimeMillis, uri, null, eventSourceHandler); +// } + + private StoppableEventSource(ExecutorService executorService, long reconnectionTimeMillis, final URI uri, final SSLEngineProvider sslEngineProvider, EventSourceHandler eventSourceHandler) { + super(executorService, reconnectionTimeMillis, uri, sslEngineProvider, eventSourceHandler); + this.executorService = executorService; + } + + public StoppableEventSource(URI uri, SSLEngineProvider sslEngineProvider, EventSourceHandler eventSourceHandler) { + this(Executors.newSingleThreadExecutor(), DEFAULT_RECONNECTION_TIME_MILLIS, uri, sslEngineProvider, eventSourceHandler); + } + + public StoppableEventSource(String uri, SSLEngineProvider sslEngineProvider, EventSourceHandler eventSourceHandler) { + this(URI.create(uri), sslEngineProvider, eventSourceHandler); + } + + public StoppableEventSource(String uri, EventSourceHandler eventSourceHandler) { + this(uri, null, eventSourceHandler); + } + + public StoppableEventSource(URI uri, EventSourceHandler eventSourceHandler) { + this(uri, null, eventSourceHandler); + } + + @Override + protected void finalize() { + System.out.println("finalize"); + } + + public void shutdownNow() { + shutdownAndAwaitTermination(1, TimeUnit.SECONDS); + } + + public void shutdownAndAwaitTermination(final long timeout, final TimeUnit unit) { + close(); + executorService.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!executorService.awaitTermination(timeout, unit)) { + executorService.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!executorService.awaitTermination(timeout, unit)) { + System.err.println("Pool did not terminate"); + } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + executorService.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } + + public boolean isShutdown() { + return executorService.isShutdown(); + } + + public boolean isTerminated() { + return executorService.isTerminated(); + } +} diff --git a/src/main/java/com/github/eventsource/client/impl/AsyncEventSourceHandler.java b/src/main/java/com/github/eventsource/client/impl/AsyncEventSourceHandler.java index 89864f2..1d7297c 100644 --- a/src/main/java/com/github/eventsource/client/impl/AsyncEventSourceHandler.java +++ b/src/main/java/com/github/eventsource/client/impl/AsyncEventSourceHandler.java @@ -3,56 +3,78 @@ import com.github.eventsource.client.EventSourceHandler; import com.github.eventsource.client.MessageEvent; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; public class AsyncEventSourceHandler implements EventSourceHandler { - private final Executor executor; + private final ExecutorService executor; private final EventSourceHandler eventSourceHandler; - public AsyncEventSourceHandler(Executor executor, EventSourceHandler eventSourceHandler) { + public AsyncEventSourceHandler(ExecutorService executor, EventSourceHandler eventSourceHandler) { this.executor = executor; this.eventSourceHandler = eventSourceHandler; } @Override public void onConnect() { - executor.execute(new Runnable() { - @Override - public void run() { - try { - eventSourceHandler.onConnect(); - } catch (Exception e) { - onError(e); + if (!executor.isShutdown()) { + executor.execute(new Runnable() { + @Override + public void run() { + try { + eventSourceHandler.onConnect(); + } catch (Exception e) { + onError(e); + } } - } - }); + }); + } } @Override public void onMessage(final String event, final MessageEvent message) { - executor.execute(new Runnable() { - @Override - public void run() { - try { - eventSourceHandler.onMessage(event, message); - } catch (Exception e) { - onError(e); + if (!executor.isShutdown()) { + executor.execute(new Runnable() { + @Override + public void run() { + try { + eventSourceHandler.onMessage(event, message); + } catch (Exception e) { + onError(e); + } } - } - }); + }); + } } @Override public void onError(final Throwable error) { - executor.execute(new Runnable() { - @Override - public void run() { - try { - eventSourceHandler.onError(error); - } catch (Throwable e) { - e.printStackTrace(); + if (!executor.isShutdown()) { + executor.execute(new Runnable() { + @Override + public void run() { + try { + eventSourceHandler.onError(error); + } catch (Throwable e) { + e.printStackTrace(); + } } - } - }); + }); + } + } + + @Override + public void onClosed(final boolean willReconnect) { + if (!executor.isShutdown()) { + executor.execute(new Runnable() { + @Override + public void run() { + try { + eventSourceHandler.onClosed(willReconnect); + } catch (Exception e) { + onError(e); + } + } + }); + } } } diff --git a/src/main/java/com/github/eventsource/client/impl/EventStreamParser.java b/src/main/java/com/github/eventsource/client/impl/EventStreamParser.java index e651eec..90d41d2 100644 --- a/src/main/java/com/github/eventsource/client/impl/EventStreamParser.java +++ b/src/main/java/com/github/eventsource/client/impl/EventStreamParser.java @@ -3,7 +3,6 @@ import com.github.eventsource.client.EventSourceHandler; import com.github.eventsource.client.MessageEvent; -import java.io.StringReader; import java.util.regex.Pattern; /** diff --git a/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java b/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java index 6ad7d7b..53f8b5d 100644 --- a/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java +++ b/src/main/java/com/github/eventsource/client/impl/netty/EventSourceChannelHandler.java @@ -1,11 +1,20 @@ package com.github.eventsource.client.impl.netty; +import com.github.eventsource.client.EventSource; import com.github.eventsource.client.EventSourceException; import com.github.eventsource.client.EventSourceHandler; import com.github.eventsource.client.impl.ConnectionHandler; import com.github.eventsource.client.impl.EventStreamParser; import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.channel.*; + +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelEvent; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.ExceptionEvent; + import org.jboss.netty.handler.codec.http.DefaultHttpRequest; import org.jboss.netty.handler.codec.http.HttpHeaders.Names; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -17,7 +26,10 @@ import org.jboss.netty.util.TimerTask; import java.net.ConnectException; +import java.net.InetSocketAddress; import java.net.URI; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; @@ -25,12 +37,13 @@ public class EventSourceChannelHandler extends SimpleChannelUpstreamHandler implements ConnectionHandler { private static final Pattern STATUS_PATTERN = Pattern.compile("HTTP/1.1 (\\d+) (.*)"); - private static final Pattern CONTENT_TYPE_PATTERN = Pattern.compile("Content-Type: text/event-stream"); + private static final Pattern CONTENT_TYPE_PATTERN = Pattern.compile("Content-Type: text/event-stream(;.*)?", Pattern.CASE_INSENSITIVE); private final EventSourceHandler eventSourceHandler; private final ClientBootstrap bootstrap; private final URI uri; private final EventStreamParser messageDispatcher; + private final Map customRequestHeaders = new HashMap(); private final Timer timer = new HashedWheelTimer(); private Channel channel; @@ -57,14 +70,23 @@ public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exc @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toString()); + final String query = uri.getQuery(); + final String path = uri.getPath() + (((null != query) && !query.isEmpty()) ? "?" + query : ""); + final int port = uri.getPort(); + final String portPostfix = ((port != -1) ? ":" + port : ""); + final String host = uri.getHost() + portPostfix; + final HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path); request.addHeader(Names.ACCEPT, "text/event-stream"); - request.addHeader(Names.HOST, uri.getHost()); - request.addHeader(Names.ORIGIN, "http://" + uri.getHost()); + request.addHeader(Names.HOST, host); + request.addHeader(Names.ORIGIN, uri.getScheme() + "://" + host); request.addHeader(Names.CACHE_CONTROL, "no-cache"); if (lastEventId != null) { request.addHeader("Last-Event-ID", lastEventId); } + // add any custom headers that have been set + for (String name : customRequestHeaders.keySet()) { + request.addHeader(name, customRequestHeaders.get(name)); + } e.getChannel().write(request); channel = e.getChannel(); } @@ -76,6 +98,10 @@ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) @Override public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { + if (eventStreamOk) { + // call onClosed only if it was successfully opened (and onConnect was called) + eventSourceHandler.onClosed(reconnectOnClose); + } if (reconnectOnClose) { reconnect(); } @@ -135,7 +161,7 @@ public void setLastEventId(String lastEventId) { this.lastEventId = lastEventId; } - public EventSourceChannelHandler close() { + public synchronized EventSourceChannelHandler close() { reconnectOnClose = false; if (channel != null) { channel.close(); @@ -150,16 +176,28 @@ public EventSourceChannelHandler join() throws InterruptedException { return this; } + /** + * Sets a custom HTTP header that will be used when the request is made to establish the SSE channel. + * + * @param name the HTTP header name + * @param value the header value + */ + public void setCustomRequestHeader(String name, String value) { + customRequestHeaders.put(name, value); + } + private void reconnect() { - if(!reconnecting.get()) { - reconnecting.set(true); + if (reconnecting.compareAndSet(false, true)) { + headerDone = false; + eventStreamOk = false; timer.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { reconnecting.set(false); + bootstrap.setOption("remoteAddress", new InetSocketAddress(uri.getHost(), EventSource.getPort(uri))); bootstrap.connect().await(); } }, reconnectionTimeMillis, TimeUnit.MILLISECONDS); } } -} \ No newline at end of file +} diff --git a/src/main/java/com/github/eventsource/client/stubs/StubHandler.java b/src/main/java/com/github/eventsource/client/stubs/StubHandler.java index 5977afb..01856db 100644 --- a/src/main/java/com/github/eventsource/client/stubs/StubHandler.java +++ b/src/main/java/com/github/eventsource/client/stubs/StubHandler.java @@ -32,6 +32,11 @@ public void onConnect() throws Exception { connected = true; } + @Override + public void onClosed(boolean willReconnect){ + connected = false; + } + @Override public void onMessage(String event, MessageEvent message) throws Exception { getMessageEvents(event).add(message); diff --git a/src/test/java/com/github/eventsource/client/DebugClient.java b/src/test/java/com/github/eventsource/client/DebugClient.java index 3c11876..5ce75fb 100644 --- a/src/test/java/com/github/eventsource/client/DebugClient.java +++ b/src/test/java/com/github/eventsource/client/DebugClient.java @@ -21,6 +21,11 @@ public void onError(Throwable t) { System.err.println("ERROR"); t.printStackTrace(); } + + @Override + public void onClosed(boolean willReconnect) { + System.err.println("CLOSED"); + } }); es.connect(); diff --git a/src/test/java/com/github/eventsource/client/EventSourceClientTest.java b/src/test/java/com/github/eventsource/client/EventSourceClientTest.java index 02fb54a..27c44d9 100644 --- a/src/test/java/com/github/eventsource/client/EventSourceClientTest.java +++ b/src/test/java/com/github/eventsource/client/EventSourceClientTest.java @@ -105,6 +105,11 @@ public void onError(Throwable t) { System.out.println("ERROR: " + t); errorCountdown.countDown(); } + + @Override + public void onClosed(boolean willReconnect) { + System.out.println("CLOSED"); + } }); eventSource.connect(); @@ -147,6 +152,10 @@ public void onMessage(String event, com.github.eventsource.client.MessageEvent m public void onError(Throwable t) { errorCountdown.countDown(); } + + @Override + public void onClosed(boolean willReconnect) { + } }); eventSource.connect().await(); }