Skip to content

Commit

Permalink
close
Browse files Browse the repository at this point in the history
  • Loading branch information
chenby committed Apr 10, 2019
1 parent d754062 commit 23b9003
Show file tree
Hide file tree
Showing 15 changed files with 139 additions and 26 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
### 2.6.4
Add `StatusListener` .

### 2.6.3
Fix `RedisScoketReplicator.executor` shutdown too early bug.

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ redis 2.6 - 5.0
<dependency>
<groupId>com.moilioncircle</groupId>
<artifactId>redis-replicator</artifactId>
<version>2.6.3</version>
<version>2.6.4</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion README.zh_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ redis 2.6 - 5.0
<dependency>
<groupId>com.moilioncircle</groupId>
<artifactId>redis-replicator</artifactId>
<version>2.6.3</version>
<version>2.6.4</version>
</dependency>
```

Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

<groupId>com.moilioncircle</groupId>
<artifactId>redis-replicator</artifactId>
<version>2.6.3</version>
<version>2.6.4</version>

<name>redis-replicator</name>
<description>
Expand Down Expand Up @@ -74,7 +74,7 @@
<connection>scm:git:[email protected]:leonchen83/redis-replicator.git</connection>
<url>scm:git:[email protected]:leonchen83/redis-replicator.git</url>
<developerConnection>scm:git:[email protected]:leonchen83/redis-replicator.git</developerConnection>
<tag>2.6.3</tag>
<tag>2.x</tag>
</scm>

<issueManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static com.moilioncircle.redis.replicator.Status.CONNECTED;
Expand All @@ -133,6 +134,7 @@ public abstract class AbstractReplicator extends AbstractReplicatorListener impl
protected Configuration configuration;
protected RedisInputStream inputStream;
protected RdbVisitor rdbVisitor = new DefaultRdbVisitor(this);
protected final AtomicBoolean manual = new AtomicBoolean(false);
protected final AtomicReference<Status> connected = new AtomicReference<>(DISCONNECTED);
protected final Map<ModuleKey, ModuleParser<? extends Module>> modules = new ConcurrentHashMap<>();
protected final Map<CommandName, CommandParser<? extends Command>> commands = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -187,7 +189,18 @@ public void submitEvent(Event event) {
doExceptionListener(this, e, event);
}
}


protected boolean compareAndSet(Status prev, Status next) {
boolean result = connected.compareAndSet(prev, next);
if (result) doStatusListener(this, next);
return result;
}

protected void setStatus(Status next) {
connected.set(next);
doStatusListener(this, next);
}

@Override
public boolean verbose() {
return configuration != null && configuration.isVerbose();
Expand Down Expand Up @@ -300,14 +313,23 @@ public void builtInCommandParserRegister() {
addCommandParser(CommandName.name("XTRIM"), new XTrimParser());
addCommandParser(CommandName.name("XSETID"), new XSetIdParser());
}


public void open() throws IOException {
manual.compareAndSet(true, false);
}

@Override
public void close() throws IOException {
this.connected.compareAndSet(CONNECTED, DISCONNECTING);
compareAndSet(CONNECTED, DISCONNECTING);
manual.compareAndSet(false, true);
}

protected boolean isClosed() {
return manual.get();
}

protected void doClose() throws IOException {
this.connected.compareAndSet(CONNECTED, DISCONNECTING);
compareAndSet(CONNECTED, DISCONNECTING);
try {
if (inputStream != null) {
this.inputStream.setRawByteListeners(null);
Expand All @@ -316,7 +338,7 @@ protected void doClose() throws IOException {
} catch (IOException ignore) {
/*NOP*/
} finally {
this.connected.set(DISCONNECTED);
setStatus(DISCONNECTED);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
public class AbstractReplicatorListener implements ReplicatorListener {
protected final List<RdbListener> rdbListeners = new CopyOnWriteArrayList<>();
protected final List<CloseListener> closeListeners = new CopyOnWriteArrayList<>();
protected final List<StatusListener> statusListeners = new CopyOnWriteArrayList<>();
protected final List<CommandListener> commandListeners = new CopyOnWriteArrayList<>();
protected final List<RawByteListener> rawByteListeners = new CopyOnWriteArrayList<>();
protected final List<AuxFieldListener> auxFieldListeners = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -99,6 +100,16 @@ public boolean removeExceptionListener(ExceptionListener listener) {
return exceptionListeners.remove(listener);
}

@Override
public boolean addStatusListener(StatusListener listener) {
return statusListeners.add(listener);
}

@Override
public boolean removeStatusListener(StatusListener listener) {
return statusListeners.remove(listener);
}

/**
* @param rawBytes input stream raw bytes
* @since 2.2.0
Expand Down Expand Up @@ -158,6 +169,13 @@ protected void doExceptionListener(Replicator replicator, Throwable throwable, O
}
}

protected void doStatusListener(Replicator replicator, Status status) {
if (statusListeners.isEmpty()) return;
for (StatusListener listener : statusListeners) {
listener.handle(replicator, status);
}
}

protected void doRawByteListener(byte... bytes) {
if (rawByteListeners.isEmpty()) return;
for (RawByteListener listener : rawByteListeners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
package com.moilioncircle.redis.replicator;

import java.io.IOException;

import static com.moilioncircle.redis.replicator.Status.CONNECTED;
import java.io.UncheckedIOException;

/**
* @author Leon Chen
Expand All @@ -27,6 +26,8 @@
abstract class AbstractReplicatorRetrier implements ReplicatorRetrier {
protected int retries = 0;

protected abstract boolean isManualClosed();

protected abstract boolean open() throws IOException;

protected abstract boolean connect() throws IOException;
Expand All @@ -39,6 +40,7 @@ public void retry(Replicator replicator) throws IOException {
Configuration configuration = replicator.getConfiguration();
for (; retries < configuration.getRetries() || configuration.getRetries() <= 0; retries++) {
exception = null;
if (isManualClosed()) break;
final long interval = configuration.getRetryTimeInterval();
try {
if (connect()) {
Expand All @@ -53,11 +55,6 @@ public void retry(Replicator replicator) throws IOException {
exception = null;
break;
} catch (IOException | UncheckedIOException e) {
//close manually
if (replicator.getStatus() != CONNECTED) {
exception = null;
break;
}
exception = translate(e);
close(exception);
sleep(interval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public RedisAofReplicator(InputStream in, Configuration configuration) {

@Override
public void open() throws IOException {
if (!this.connected.compareAndSet(DISCONNECTED, CONNECTED)) return;
super.open();
if (!compareAndSet(DISCONNECTED, CONNECTED)) return;
try {
doOpen();
} catch (EOFException ignore) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public RedisMixReplicator(InputStream in, Configuration configuration) {

@Override
public void open() throws IOException {
if (!this.connected.compareAndSet(DISCONNECTED, CONNECTED)) return;
super.open();
if (!compareAndSet(DISCONNECTED, CONNECTED)) return;
try {
doOpen();
} catch (EOFException ignore) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public RedisRdbReplicator(InputStream in, Configuration configuration) {

@Override
public void open() throws IOException {
if (!this.connected.compareAndSet(DISCONNECTED, CONNECTED)) return;
super.open();
if (!compareAndSet(DISCONNECTED, CONNECTED)) return;
try {
doOpen();
} catch (EOFException ignore) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,16 @@ public boolean removeExceptionListener(ExceptionListener listener) {
return replicator.removeExceptionListener(listener);
}

@Override
public boolean addStatusListener(StatusListener listener) {
return replicator.addStatusListener(listener);
}

@Override
public boolean removeStatusListener(StatusListener listener) {
return replicator.removeStatusListener(listener);
}

@Override
public boolean verbose() {
return replicator.verbose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class RedisSocketReplicator extends AbstractReplicator {
protected ReplyParser replyParser;
protected ScheduledFuture<?> heartbeat;
protected RedisOutputStream outputStream;
protected ScheduledExecutorService executor;
protected ScheduledExecutorService executor;
protected final RedisSocketFactory socketFactory;

public RedisSocketReplicator(String host, int port, Configuration configuration) {
Expand All @@ -94,7 +94,8 @@ public RedisSocketReplicator(String host, int port, Configuration configuration)
*/
@Override
public void open() throws IOException {
this.executor = Executors.newSingleThreadScheduledExecutor();
super.open();
this.executor = Executors.newSingleThreadScheduledExecutor();
try {
new RedisSocketReplicatorRetrier().retry(this);
} finally {
Expand Down Expand Up @@ -273,7 +274,7 @@ protected <T> T reply(BulkReplyHandler handler) throws IOException {
}

protected void connect() throws IOException {
if (!connected.compareAndSet(DISCONNECTED, CONNECTING)) return;
if (!compareAndSet(DISCONNECTED, CONNECTING)) return;
try {
socket = socketFactory.createSocket(host, port, configuration.getConnectionTimeout());
outputStream = new RedisOutputStream(socket.getOutputStream());
Expand All @@ -289,14 +290,14 @@ protected void connect() throws IOException {
replyParser = new ReplyParser(this.inputStream, new RedisCodec());
logger.info("Connected to redis-server[{}:{}]", host, port);
} finally {
connected.set(CONNECTED);
setStatus(CONNECTED);
}
}

@Override
protected void doClose() throws IOException {
connected.compareAndSet(CONNECTED, DISCONNECTING);

compareAndSet(CONNECTED, DISCONNECTING);
try {
if (heartbeat != null) {
if (!heartbeat.isCancelled()) heartbeat.cancel(true);
Expand All @@ -323,7 +324,7 @@ protected void doClose() throws IOException {
}
logger.info("socket closed. redis-server[{}:{}]", host, port);
} finally {
connected.set(DISCONNECTED);
setStatus(DISCONNECTED);
}
}

Expand All @@ -347,6 +348,11 @@ protected boolean close(IOException reason) throws IOException {
return true;
}

@Override
protected boolean isManualClosed() {
return RedisSocketReplicator.this.isClosed();
}

@Override
protected boolean open() throws IOException {
String replId = configuration.getReplId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,11 @@ public interface ReplicatorListener {
boolean addExceptionListener(ExceptionListener listener);

boolean removeExceptionListener(ExceptionListener listener);

/*
* Connection
*/
boolean addStatusListener(StatusListener listener);

boolean removeStatusListener(StatusListener listener);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2016-2018 Leon Chen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.moilioncircle.redis.replicator;

/**
* @author Leon Chen
* @since 2.6.4
*/
@FunctionalInterface
public interface StatusListener {
void handle(Replicator replicator, Status status);
}
21 changes: 21 additions & 0 deletions src/test/java/com/moilioncircle/redis/replicator/CloseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@

import com.moilioncircle.redis.replicator.cmd.Command;
import com.moilioncircle.redis.replicator.cmd.CommandListener;
import com.moilioncircle.redis.replicator.event.Event;
import com.moilioncircle.redis.replicator.io.RateLimitInputStream;
import com.moilioncircle.redis.replicator.rdb.RdbListener;
import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair;
import org.junit.Test;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.atomic.AtomicInteger;

import static com.moilioncircle.redis.replicator.Status.DISCONNECTED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
* @author Leon Chen
Expand Down Expand Up @@ -247,4 +250,22 @@ public void postFullSync(Replicator replicator, long checksum) {
assertEquals(0, acc.get());
assertEquals(DISCONNECTED, replicator.getStatus());
}

@Test
public void testMixClose13() throws IOException, URISyntaxException, InterruptedException {
final Replicator replicator = new RedisReplicator("redis://127.0.0.1:7777?retries=-1");
new Thread() {
@Override
public void run() {
try {
replicator.open();
} catch (IOException e) {
}
}
}.start();
Thread.sleep(3500);
replicator.close();
Thread.sleep(2000);
assertEquals(DISCONNECTED, replicator.getStatus());
}
}

0 comments on commit 23b9003

Please sign in to comment.