Skip to content

Commit

Permalink
Revert "fix: handle BerkeleyJE DB interruption [tp-tests]" CTR [tp-te…
Browse files Browse the repository at this point in the history
…sts]

This reverts commit 90b9694.
Reason: tp-tests are failing after this commit.

Signed-off-by: Oleksandr Porunov <[email protected]>
  • Loading branch information
porunov committed Oct 18, 2024
1 parent 9f09767 commit 872a475
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 259 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,11 @@
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.Put;
import com.sleepycat.je.ReadOptions;
import com.sleepycat.je.ThreadInterruptedException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.WriteOptions;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.PermanentBackendException;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.TemporaryBackendException;
import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction;
import org.janusgraph.diskstorage.keycolumnvalue.keyvalue.KVQuery;
import org.janusgraph.diskstorage.keycolumnvalue.keyvalue.KeySelector;
Expand Down Expand Up @@ -63,10 +60,10 @@ public class BerkeleyJEKeyValueStore implements OrderedKeyValueStore {
public static Function<Integer, Integer> ttlConverter = ttl -> (int) Math.max(1, Duration.of(ttl, ChronoUnit.SECONDS).toHours());


private volatile Database db;
private final Database db;
private final String name;
private final BerkeleyJEStoreManager manager;
private volatile boolean isOpen;
private boolean isOpen;

public BerkeleyJEKeyValueStore(String n, Database data, BerkeleyJEStoreManager m) {
db = data;
Expand All @@ -78,11 +75,6 @@ public BerkeleyJEKeyValueStore(String n, Database data, BerkeleyJEStoreManager m
public DatabaseConfig getConfiguration() throws BackendException {
try {
return db.getConfig();
} catch (ThreadInterruptedException e) {
Thread.currentThread().interrupt();
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
} catch (EnvironmentFailureException e) {
throw new TemporaryBackendException(e);
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand All @@ -103,24 +95,15 @@ private Cursor openCursor(StoreTransaction txh) throws BackendException {
return ((BerkeleyJETx) txh).openCursor(db);
}

private static void closeCursor(StoreTransaction txh, Cursor cursor) throws BackendException {
private static void closeCursor(StoreTransaction txh, Cursor cursor) {
Preconditions.checkArgument(txh!=null);
((BerkeleyJETx) txh).closeCursor(cursor);
}

public void reopen(final Database db) {
this.db = db;
}

@Override
public synchronized void close() throws BackendException {
try {
if(isOpen) db.close();
} catch (ThreadInterruptedException e) {
Thread.currentThread().interrupt();
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
} catch (EnvironmentFailureException e) {
throw new TemporaryBackendException(e);
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand All @@ -144,11 +127,6 @@ public StaticBuffer get(StaticBuffer key, StoreTransaction txh) throws BackendEx
} else {
return null;
}
} catch (ThreadInterruptedException e) {
Thread.currentThread().interrupt();
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
} catch (EnvironmentFailureException e) {
throw new TemporaryBackendException(e);
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand Down Expand Up @@ -183,11 +161,7 @@ public RecordIterator<KeyValueEntry> getSlice(KVQuery query, StoreTransaction tx
@Override
public boolean hasNext() {
if (current == null) {
try {
current = getNextEntry();
} catch (BackendException e) {
throw new RuntimeException(e);
}
current = getNextEntry();
}
return current != null;
}
Expand All @@ -202,26 +176,16 @@ public KeyValueEntry next() {
return next;
}

private KeyValueEntry getNextEntry() throws BackendException {
private KeyValueEntry getNextEntry() {
if (status != null && status != OperationStatus.SUCCESS) {
return null;
}
while (!selector.reachedLimit()) {
try {
if (status == null) {
status = cursor.get(foundKey, foundData, Get.SEARCH_GTE, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
} else {
status = cursor.get(foundKey, foundData, Get.NEXT, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
}
} catch (ThreadInterruptedException e) {
Thread.currentThread().interrupt();
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
} catch (EnvironmentFailureException e) {
throw new TemporaryBackendException(e);
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
if (status == null) {
status = cursor.get(foundKey, foundData, Get.SEARCH_GTE, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
} else {
status = cursor.get(foundKey, foundData, Get.NEXT, getReadOptions(txh)) == null ? OperationStatus.NOTFOUND : OperationStatus.SUCCESS;
}

if (status != OperationStatus.SUCCESS) {
break;
}
Expand All @@ -241,11 +205,7 @@ private KeyValueEntry getNextEntry() throws BackendException {

@Override
public void close() {
try {
closeCursor(txh, cursor);
} catch (BackendException e) {
throw new RuntimeException(e);
}
closeCursor(txh, cursor);
}

@Override
Expand Down Expand Up @@ -277,22 +237,13 @@ public void insert(StaticBuffer key, StaticBuffer value, StoreTransaction txh, b
int convertedTtl = ttlConverter.apply(ttl);
writeOptions.setTTL(convertedTtl, TimeUnit.HOURS);
}
try {
if (allowOverwrite) {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions);
EnvironmentFailureException.assertState(result != null);
status = OperationStatus.SUCCESS;
} else {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions);
status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS;
}
} catch (ThreadInterruptedException e) {
Thread.currentThread().interrupt();
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
} catch (EnvironmentFailureException e) {
throw new TemporaryBackendException(e);
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
if (allowOverwrite) {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.OVERWRITE, writeOptions);
EnvironmentFailureException.assertState(result != null);
status = OperationStatus.SUCCESS;
} else {
OperationResult result = db.put(tx, key.as(ENTRY_FACTORY), value.as(ENTRY_FACTORY), Put.NO_OVERWRITE, writeOptions);
status = result == null ? OperationStatus.KEYEXIST : OperationStatus.SUCCESS;
}

if (status != OperationStatus.SUCCESS) {
Expand All @@ -310,11 +261,6 @@ public void delete(StaticBuffer key, StoreTransaction txh) throws BackendExcepti
if (status != OperationStatus.SUCCESS && status != OperationStatus.NOTFOUND) {
throw new PermanentBackendException("Could not remove: " + status);
}
} catch (ThreadInterruptedException e) {
Thread.currentThread().interrupt();
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(e);
} catch (EnvironmentFailureException e) {
throw new TemporaryBackendException(e);
} catch (DatabaseException e) {
throw new PermanentBackendException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,13 @@
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.ThreadInterruptedException;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.BaseTransactionConfig;
import org.janusgraph.diskstorage.PermanentBackendException;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.TemporaryBackendException;
import org.janusgraph.diskstorage.common.LocalStoreManager;
import org.janusgraph.diskstorage.configuration.ConfigNamespace;
import org.janusgraph.diskstorage.configuration.ConfigOption;
Expand All @@ -52,10 +48,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.janusgraph.diskstorage.configuration.ConfigOption.disallowEmpty;

Expand Down Expand Up @@ -93,16 +88,19 @@ public class BerkeleyJEStoreManager extends LocalStoreManager implements Ordered
ConfigOption.Type.MASKABLE, String.class,
IsolationLevel.REPEATABLE_READ.toString(), disallowEmpty(String.class));

private final ConcurrentMap<String, BerkeleyJEKeyValueStore> stores;
private final Map<String, BerkeleyJEKeyValueStore> stores;

protected volatile Environment environment;
protected Environment environment;
protected final StoreFeatures features;

public BerkeleyJEStoreManager(Configuration configuration) throws BackendException {
super(configuration);
stores = new ConcurrentHashMap<>();
stores = new HashMap<>();

initialize();
int cachePercentage = configuration.get(JVM_CACHE);
boolean sharedCache = configuration.get(SHARED_CACHE);
CacheMode cacheMode = ConfigOption.getEnumValue(configuration.get(CACHE_MODE), CacheMode.class);
initialize(cachePercentage, sharedCache, cacheMode);

features = new StandardStoreFeatures.Builder()
.orderedScan(true)
Expand All @@ -113,24 +111,14 @@ public BerkeleyJEStoreManager(Configuration configuration) throws BackendExcepti
.scanTxConfig(GraphDatabaseConfiguration.buildGraphConfiguration()
.set(ISOLATION_LEVEL, IsolationLevel.READ_UNCOMMITTED.toString())
)
.supportsInterruption(true)
.supportsInterruption(false)
.cellTTL(true)
.optimisticLocking(false)
.build();
}

private synchronized void initialize() throws BackendException {
private void initialize(int cachePercent, final boolean sharedCache, final CacheMode cacheMode) throws BackendException {
try {
if (environment != null && environment.isValid()) {
return;
}

close(true);

int cachePercent = storageConfig.get(JVM_CACHE);
boolean sharedCache = storageConfig.get(SHARED_CACHE);
CacheMode cacheMode = ConfigOption.getEnumValue(storageConfig.get(CACHE_MODE), CacheMode.class);

EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
envConfig.setTransactional(transactional);
Expand All @@ -143,28 +131,15 @@ private synchronized void initialize() throws BackendException {
envConfig.setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false");
}

// Open the environment
//Open the environment
environment = new Environment(directory, envConfig);

// Reopen any existing DB connections
for (String storeName : stores.keySet()) {
openDatabase(storeName, true);
}
} catch (DatabaseException e) {
throw new PermanentBackendException("Error during BerkeleyJE initialization: ", e);
}

}

private synchronized void reInitialize(DatabaseException exception) throws BackendException {
initialize();

if (exception instanceof ThreadInterruptedException) {
Thread.currentThread().interrupt();
throw (TraversalInterruptedException) new TraversalInterruptedException().initCause(exception);
}
}

@Override
public StoreFeatures getFeatures() {
return features;
Expand All @@ -175,7 +150,8 @@ public List<KeyRange> getLocalKeyPartition() throws BackendException {
throw new UnsupportedOperationException();
}

private BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg, boolean retryEnvironmentFailure) throws BackendException {
@Override
public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg) throws BackendException {
try {
Transaction tx = null;

Expand Down Expand Up @@ -206,27 +182,15 @@ private BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg, boolean
}

return btx;
} catch (EnvironmentFailureException e) {
reInitialize(e);

if (retryEnvironmentFailure) {
return beginTransaction(txCfg, false);
}

throw new TemporaryBackendException("Could not start BerkeleyJE transaction", e);
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not start BerkeleyJE transaction", e);
}
}

@Override
public BerkeleyJETx beginTransaction(final BaseTransactionConfig txCfg) throws BackendException {
return beginTransaction(txCfg, true);
}

private BerkeleyJEKeyValueStore openDatabase(String name, boolean force, boolean retryEnvironmentFailure) throws BackendException {
public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException {
Preconditions.checkNotNull(name);
if (stores.containsKey(name) && !force) {
if (stores.containsKey(name)) {
return stores.get(name);
}
try {
Expand All @@ -245,34 +209,13 @@ private BerkeleyJEKeyValueStore openDatabase(String name, boolean force, boolean
log.debug("Opened database {}", name);

BerkeleyJEKeyValueStore store = new BerkeleyJEKeyValueStore(name, db, this);
if (stores.containsKey(name)) {
stores.get(name).reopen(db);
} else {
stores.put(name, store);
}
stores.put(name, store);
return store;
} catch (EnvironmentFailureException e) {
reInitialize(e);

if (retryEnvironmentFailure) {
return openDatabase(name, force, false);
}

throw new TemporaryBackendException("Could not open BerkeleyJE data store", e);
} catch (DatabaseException e) {
throw new PermanentBackendException("Could not open BerkeleyJE data store", e);
}
}

private BerkeleyJEKeyValueStore openDatabase(String name, boolean force) throws BackendException {
return openDatabase(name, force, true);
}

@Override
public BerkeleyJEKeyValueStore openDatabase(String name) throws BackendException {
return openDatabase(name, false, true);
}

@Override
public void mutateMany(Map<String, KVMutation> mutations, StoreTransaction txh) throws BackendException {
for (Map.Entry<String,KVMutation> mutation : mutations.entrySet()) {
Expand Down Expand Up @@ -309,16 +252,18 @@ void removeDatabase(BerkeleyJEKeyValueStore db) {
log.debug("Removed database {}", name);
}

public void close(boolean force) throws BackendException {

@Override
public void close() throws BackendException {
if (environment != null) {
if (!force && !stores.isEmpty())
if (!stores.isEmpty())
throw new IllegalStateException("Cannot shutdown manager since some databases are still open");
try {
// TODO this looks like a race condition
//Wait just a little bit before closing so that independent transaction threads can clean up.
Thread.sleep(30);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
//Ignore
}
try {
environment.close();
Expand All @@ -329,11 +274,6 @@ public void close(boolean force) throws BackendException {

}

@Override
public void close() throws BackendException {
close(false);
}

private static final Transaction NULL_TRANSACTION = null;

@Override
Expand Down
Loading

0 comments on commit 872a475

Please sign in to comment.