Skip to content

Commit

Permalink
SOLR-17036: Update log lazy creates VersionBucket and highest field r…
Browse files Browse the repository at this point in the history
…emoved. (#2021)
  • Loading branch information
bruno-roustant committed Dec 5, 2023
1 parent 76bb8f2 commit 571bf71
Show file tree
Hide file tree
Showing 12 changed files with 42 additions and 540 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -751,13 +751,6 @@ public final void doSyncOrReplicateRecovery(SolrCore core) throws Exception {
}
}

// if replay was skipped (possibly to due pulling a full index from the leader),
// then we still need to update version bucket seeds after recovery
if (successfulRecovery && replayFuture == null) {
log.info("Updating version bucket highest from index after successful recovery.");
core.seedVersionBuckets();
}

if (log.isInfoEnabled()) {
log.info(
"Finished recovery process, successful=[{}] msTimeTaken={}",
Expand Down
20 changes: 0 additions & 20 deletions solr/core/src/java/org/apache/solr/core/SolrCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -1199,10 +1199,6 @@ private SolrCore(
.initializeMetrics(solrMetricsContext, "directoryFactory");
}

// seed version buckets with max from index during core initialization ... requires a
// searcher!
seedVersionBuckets();

bufferUpdatesIfConstructing(coreDescriptor);

this.ruleExpiryLock = new ReentrantLock();
Expand Down Expand Up @@ -1238,22 +1234,6 @@ private SolrCore(
assert ObjectReleaseTracker.track(this);
}

public void seedVersionBuckets() {
UpdateHandler uh = getUpdateHandler();
if (uh != null && uh.getUpdateLog() != null) {
RefCounted<SolrIndexSearcher> newestSearcher = getRealtimeSearcher();
if (newestSearcher != null) {
try {
uh.getUpdateLog().seedBucketsWithHighestVersion(newestSearcher.get());
} finally {
newestSearcher.decref();
}
} else {
log.warn("No searcher available! Cannot seed version buckets with max from index.");
}
}
}

/** Set UpdateLog to buffer updates if the slice is in construction. */
private void bufferUpdatesIfConstructing(CoreDescriptor coreDescriptor) {

Expand Down
14 changes: 0 additions & 14 deletions solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
Expand Down Expand Up @@ -104,7 +103,6 @@
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.security.AuthorizationContext;
import org.apache.solr.update.SolrIndexWriter;
import org.apache.solr.update.VersionInfo;
import org.apache.solr.util.NumberUtils;
import org.apache.solr.util.PropertiesInputStream;
import org.apache.solr.util.RefCounted;
Expand Down Expand Up @@ -730,18 +728,6 @@ public CoreReplicationAPI.IndexVersionResponse getIndexVersionResponse() throws
return rsp;
}

/**
* Retrieves the maximum version number from an index commit. NOTE: The commit <b>MUST</b> be
* reserved before calling this method
*/
private long getMaxVersion(IndexCommit commit) throws IOException {
try (DirectoryReader reader = DirectoryReader.open(commit)) {
IndexSearcher searcher = new IndexSearcher(reader);
VersionInfo vinfo = core.getUpdateHandler().getUpdateLog().getVersionInfo();
return Math.abs(vinfo.getMaxVersionFromIndex(searcher));
}
}

/**
* For configuration files, checksum of the file is included because, unlike index files, they may
* have same content but different timestamps.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@
import java.util.concurrent.locks.ReentrantLock;
import org.apache.solr.common.SolrException;

/**
* @lucene.internal
*/
/**
* This implementation uses lock and condition and will throw exception if it can't obtain the lock
* within <code>lockTimeoutMs</code>.
*
* @lucene.internal
*/
public class TimedVersionBucket extends VersionBucket {

Expand Down
73 changes: 0 additions & 73 deletions solr/core/src/java/org/apache/solr/update/UpdateLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.util.LongSet;
import org.apache.solr.util.OrderedExecutor;
import org.apache.solr.util.RTimer;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.TestInjection;
import org.apache.solr.util.TimeOut;
Expand Down Expand Up @@ -222,7 +221,6 @@ public String toString() {
// This should only be used to initialize VersionInfo... the actual number of buckets may be
// rounded up to a power of two.
protected int numVersionBuckets;
protected Long maxVersionFromIndex = null;
protected boolean existOldBufferLog = false;

// keep track of deletes only... this is not updated on an add
Expand Down Expand Up @@ -1889,12 +1887,6 @@ public void run() {
// change the state while updates are still blocked to prevent races
state = State.ACTIVE;
if (finishing) {

// after replay, update the max from the index
log.info("Re-computing max version from index after log re-play.");
maxVersionFromIndex = null;
getMaxVersionFromIndex();

versionInfo.unblockUpdates();
}

Expand Down Expand Up @@ -2327,71 +2319,6 @@ public void clearLog(SolrCore core, PluginInfo ulogPluginInfo) {
}
}

public Long getCurrentMaxVersion() {
return maxVersionFromIndex;
}

// this method is primarily used for unit testing and is not part of the public API for this class
Long getMaxVersionFromIndex() {
RefCounted<SolrIndexSearcher> newestSearcher =
(uhandler != null && uhandler.core != null) ? uhandler.core.getRealtimeSearcher() : null;
if (newestSearcher == null)
throw new IllegalStateException("No searcher available to lookup max version from index!");

try {
seedBucketsWithHighestVersion(newestSearcher.get());
return getCurrentMaxVersion();
} finally {
newestSearcher.decref();
}
}

/** Used to seed all version buckets with the max value of the version field in the index. */
protected Long seedBucketsWithHighestVersion(
SolrIndexSearcher newSearcher, VersionInfo versions) {
Long highestVersion = null;
final RTimer timer = new RTimer();

try (RecentUpdates recentUpdates = getRecentUpdates()) {
long maxVersionFromRecent = recentUpdates.getMaxRecentVersion();
long maxVersionFromIndex = versions.getMaxVersionFromIndex(newSearcher);

long maxVersion = Math.max(maxVersionFromIndex, maxVersionFromRecent);
if (maxVersion == 0L) {
maxVersion = versions.getNewClock();
log.info(
"Could not find max version in index or recent updates, using new clock {}",
maxVersion);
}

// seed all version buckets with the highest value from recent and index
versions.seedBucketsWithHighestVersion(maxVersion);

highestVersion = maxVersion;
} catch (IOException ioExc) {
log.warn("Failed to determine the max value of the version field due to: ", ioExc);
}

if (debug) {
log.debug(
"Took {}ms to seed version buckets with highest version {}",
timer.getTime(),
highestVersion);
}

return highestVersion;
}

public void seedBucketsWithHighestVersion(SolrIndexSearcher newSearcher) {
log.debug("Looking up max value of version field to seed version buckets");
versionInfo.blockUpdates();
try {
maxVersionFromIndex = seedBucketsWithHighestVersion(newSearcher, versionInfo);
} finally {
versionInfo.unblockUpdates();
}
}

@SuppressForbidden(reason = "extends linkedhashmap")
private static class OldDeletesLinkedHashMap extends LinkedHashMap<BytesRef, LogPtr> {
private final int numDeletesToKeepInternal;
Expand Down
12 changes: 2 additions & 10 deletions solr/core/src/java/org/apache/solr/update/VersionBucket.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,13 @@
// TODO: store the highest possible in the index on a commit (but how to not block adds?)
// TODO: could also store highest possible in the transaction log after a commit.
// Or on a new index, just scan "version" for the max?
/**
* @lucene.internal
*/
/**
* The default implementation which uses the intrinsic object monitor. It uses less memory but
* ignores the <code>lockTimeoutMs</code>.
*
* @lucene.internal
*/
public class VersionBucket {
public long highest;

public void updateHighest(long val) {
if (highest != 0) {
highest = Math.max(highest, Math.abs(val));
}
}

@FunctionalInterface
public interface CheckedFunction<T, R> {
Expand Down
127 changes: 22 additions & 105 deletions solr/core/src/java/org/apache/solr/update/VersionInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,32 @@
import static org.apache.solr.common.params.CommonParams.VERSION_FIELD;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.index.Terms;
import org.apache.lucene.queries.function.FunctionValues;
import org.apache.lucene.queries.function.ValueSource;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.util.BitUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.SuppressForbidden;
import org.apache.solr.index.SlowCompositeReaderWrapper;
import org.apache.solr.legacy.LegacyNumericUtils;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VersionInfo {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS =
"bucketVersionLockTimeoutMs";

private final UpdateLog ulog;
private final VersionBucket[] buckets;
private SchemaField versionField;
private final int numBuckets;
private volatile VersionBucket[] buckets;
private final Object bucketsSync = new Object();
private final SchemaField versionField;
final ReadWriteLock lock = new ReentrantReadWriteLock(true);

private int versionBucketLockTimeoutMs;
private final int versionBucketLockTimeoutMs;

/**
* Gets and returns the {@link org.apache.solr.common.params.CommonParams#VERSION_FIELD} from the
Expand Down Expand Up @@ -103,14 +94,7 @@ public VersionInfo(UpdateLog ulog, int nBuckets) {
.get("versionBucketLockTimeoutMs")
.intVal(
Integer.parseInt(System.getProperty(SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS, "0")));
buckets = new VersionBucket[BitUtil.nextHighestPowerOfTwo(nBuckets)];
for (int i = 0; i < buckets.length; i++) {
if (versionBucketLockTimeoutMs > 0) {
buckets[i] = new TimedVersionBucket();
} else {
buckets[i] = new VersionBucket();
}
}
numBuckets = BitUtil.nextHighestPowerOfTwo(nBuckets);
}

public int getVersionBucketLockTimeoutMs() {
Expand Down Expand Up @@ -205,11 +189,25 @@ public VersionBucket bucket(int hash) {
// Make sure high bits are moved down, since only the low bits will matter.
// int h = hash + (hash >>> 8) + (hash >>> 16) + (hash >>> 24);
// Assume good hash codes for now.

int slot = hash & (buckets.length - 1);
int slot = hash & (numBuckets - 1);
if (buckets == null) {
synchronized (bucketsSync) {
if (buckets == null) {
buckets = createVersionBuckets();
}
}
}
return buckets[slot];
}

private VersionBucket[] createVersionBuckets() {
VersionBucket[] buckets = new VersionBucket[numBuckets];
for (int i = 0; i < buckets.length; i++) {
buckets[i] = versionBucketLockTimeoutMs > 0 ? new TimedVersionBucket() : new VersionBucket();
}
return buckets;
}

public Long lookupVersion(BytesRef idBytes) {
return ulog.lookupVersion(idBytes);
}
Expand Down Expand Up @@ -246,85 +244,4 @@ public Long getVersionFromIndex(BytesRef idBytes) {
}
}
}

/** Returns the highest version from the index, or 0L if no versions can be found in the index. */
@SuppressWarnings({"unchecked"})
public Long getMaxVersionFromIndex(IndexSearcher searcher) throws IOException {

final String versionFieldName = versionField.getName();

log.debug(
"Refreshing highest value of {} for {} version buckets from index",
versionFieldName,
buckets.length);
// if indexed, then we have terms to get the max from
if (versionField.indexed()) {
if (versionField.getType().isPointField()) {
return getMaxVersionFromIndexedPoints(searcher);
} else {
return getMaxVersionFromIndexedTerms(searcher);
}
}
// else: not indexed, use docvalues via value source ...

long maxVersionInIndex = 0L;
ValueSource vs = versionField.getType().getValueSource(versionField, null);
Map<Object, Object> funcContext = ValueSource.newContext(searcher);
vs.createWeight(funcContext, searcher);
// TODO: multi-thread this
for (LeafReaderContext ctx : searcher.getTopReaderContext().leaves()) {
int maxDoc = ctx.reader().maxDoc();
FunctionValues fv = vs.getValues(funcContext, ctx);
for (int doc = 0; doc < maxDoc; doc++) {
long v = fv.longVal(doc);
maxVersionInIndex = Math.max(v, maxVersionInIndex);
}
}
return maxVersionInIndex;
}

public void seedBucketsWithHighestVersion(long highestVersion) {
for (int i = 0; i < buckets.length; i++) {
// should not happen, but in case other threads are calling updateHighest on the version
// bucket
synchronized (buckets[i]) {
if (buckets[i].highest < highestVersion) buckets[i].highest = highestVersion;
}
}
}

private long getMaxVersionFromIndexedTerms(IndexSearcher searcher) throws IOException {
assert !versionField.getType().isPointField();

final String versionFieldName = versionField.getName();
final LeafReader leafReader = SlowCompositeReaderWrapper.wrap(searcher.getIndexReader());
final Terms versionTerms = leafReader.terms(versionFieldName);
final Long max = (versionTerms != null) ? LegacyNumericUtils.getMaxLong(versionTerms) : null;
if (null != max) {
log.debug("Found MAX value {} from Terms for {} in index", max, versionFieldName);
return max.longValue();
}
return 0L;
}

private long getMaxVersionFromIndexedPoints(IndexSearcher searcher) throws IOException {
assert versionField.getType().isPointField();

final String versionFieldName = versionField.getName();
final byte[] maxBytes =
PointValues.getMaxPackedValue(searcher.getIndexReader(), versionFieldName);
if (null == maxBytes) {
return 0L;
}
final Object maxObj = versionField.getType().toObject(versionField, new BytesRef(maxBytes));
if (null == maxObj || !(maxObj instanceof Number)) {
// HACK: aparently nothing asserts that the FieldType is numeric (let alone a Long???)
log.error("Unable to convert MAX byte[] from Points for {} in index", versionFieldName);
return 0L;
}

final long max = ((Number) maxObj).longValue();
log.debug("Found MAX value {} from Points for {} in index", max, versionFieldName);
return max;
}
}
Loading

0 comments on commit 571bf71

Please sign in to comment.