Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: delta transitions in shared-memory mode #615

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1edaa13
Start adding delta support
Sunjeet May 20, 2023
9d98237
Continue adding delta support
Sunjeet May 20, 2023
0a73dd9
GapEncodedVariableLengthIntegerReader.java supports shared memory mode
Sunjeet May 20, 2023
798ad32
BlobByteBuffer supports putLong
Sunjeet May 20, 2023
67ee931
EncodedLongBuffer supports setElementValue
Sunjeet May 20, 2023
49429d4
EncodedLongBuffer supports testCopyBitRange
Sunjeet May 21, 2023
5dea324
EncodedLongBuffer supports increment/incrementMany
Sunjeet May 21, 2023
b3fe5e2
EncodedLongBuffer- add unit test for copy small bit range
Sunjeet May 21, 2023
359002d
EncodedLongBuffer implements clearElementValue
Sunjeet May 21, 2023
bf460ae
Run it up for simple data model
Sunjeet May 21, 2023
6fa9a07
Implement destroy for BlobByteBuffer, but dont invoke it yet
Sunjeet May 21, 2023
7d669a6
Refactor fixed length data provisioning
Sunjeet May 21, 2023
3446435
Cleanup before touching non object types
Sunjeet May 21, 2023
ae505e6
Support remaining fixed length types- list, set, map
Sunjeet May 21, 2023
55b655c
Shared memory mode delta transitions for variable length types
Sunjeet May 22, 2023
1bce619
Cleanup
Sunjeet May 22, 2023
770fac8
Bugfix for empty transition file being mmapped, other refactor/cleanup
Sunjeet Jun 10, 2023
46dfd6b
Gap encoded combined removed ordinals applicable conditionally- alway…
Sunjeet Jun 12, 2023
730cec9
Delta application performance- bulk copy fixed length data- but only …
Sunjeet Jun 13, 2023
a75f7a1
Delta target file- add schema name and shard num for diagnostic, disa…
Sunjeet Jun 17, 2023
f66b074
Take a stab at unmap/close
Sunjeet Jun 18, 2023
4a91f9b
Some minor fixes in file cleanup
Sunjeet Jun 18, 2023
37e9084
lifecycle staging files and cleanup
Sunjeet Jun 19, 2023
8770271
Delta transitions files under java.io.tmpdir
Sunjeet Jun 21, 2023
ab45e0d
Change unexpected read from exception to warn
Sunjeet Jun 21, 2023
ea9ef51
Fix for encoded long buffer setELementVAlue getElementValue for bytes…
Sunjeet Jun 22, 2023
5fabbd3
Trying removing explicit call to gc
Sunjeet Jun 22, 2023
013ca0f
Employ sun misc Cleaner for unmap - java8 only
Sunjeet Jun 22, 2023
5f91441
Fewer threads when computing history
Sunjeet Jun 23, 2023
6001a79
Patch allocated target file length
Sunjeet Jun 23, 2023
202cc43
Cleanup logs
Sunjeet Jun 23, 2023
ca84256
Temporarily disable cleaner
Sunjeet Jun 23, 2023
97a1030
Revert "Temporarily disable cleaner"
Sunjeet Jun 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ private void exploreOrdinals(HollowReadStateEngine readStateEngine) {
System.out.println("CUSTOM_VERSION_TAG= " + readStateEngine.getHeaderTags().get(CUSTOM_VERSION_TAG));
for (HollowTypeReadState typeReadState : readStateEngine.getTypeStates()) {
BitSet populatedOrdinals = typeReadState.getPopulatedOrdinals();
System.out.println("SNAP: PopulatedOrdinals= " + populatedOrdinals);
// System.out.println("SNAP: PopulatedOrdinals= " + populatedOrdinals);
int ordinal = populatedOrdinals.nextSetBit(0);
while (ordinal != -1) {
HollowObjectTypeReadState o = (HollowObjectTypeReadState) typeReadState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,17 @@ private void applyDeltaOnlyPlan(HollowUpdatePlan updatePlan, HollowConsumer.Refr
}

private void applyDeltaTransition(HollowConsumer.Blob blob, boolean isSnapshotPlan, HollowConsumer.RefreshListener[] refreshListeners) throws Throwable {
if (!memoryMode.equals(MemoryMode.ON_HEAP)) {
LOG.warning("Skipping delta transition in shared-memory mode");
return;
}
LOG.info(String.format("Attempting delta transition from v %s to v %s in %s mode",
blob.getFromVersion(), blob.getToVersion(), memoryMode));

try (HollowBlobInput in = HollowBlobInput.modeBasedSelector(memoryMode, blob);
OptionalBlobPartInput optionalPartIn = blob.getOptionalBlobPartInputs()) {
applyStateEngineTransition(in, optionalPartIn, blob, refreshListeners);

if(objLongevityConfig.enableLongLivedObjectSupport()) {
if (!memoryMode.equals(MemoryMode.ON_HEAP)) {
throw new UnsupportedOperationException("Shared memory mode doesn't support object longevity... yet");
}
HollowDataAccess previousDataAccess = currentAPI.getDataAccess();
HollowHistoricalStateDataAccess priorState = new HollowHistoricalStateCreator(null).createBasedOnNewDelta(currentVersion, stateEngine);
HollowProxyDataAccess newDataAccess = new HollowProxyDataAccess();
Expand Down Expand Up @@ -245,7 +246,11 @@ private void applyDeltaTransition(HollowConsumer.Blob blob, boolean isSnapshotPl

} catch(Throwable t) {
failedTransitionTracker.markFailedTransition(blob);
LOG.warning("SNAP: Delta transition encountered exception: " + t);
throw t;
} finally {
LOG.info(String.format("Delta transition completed from v %s to v %s in %s mode",
blob.getFromVersion(), blob.getToVersion(), memoryMode));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,64 @@

import com.netflix.hollow.core.memory.encoding.BlobByteBuffer;
import com.netflix.hollow.core.read.HollowBlobInput;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.util.logging.Logger;

/**
* {@code BlobByteBuffer} based implementation of variable length byte data that only supports read.
* {@code BlobByteBuffer} based implementation of variable length byte data that only supports read. // TODO: update when supports write
*/
public class EncodedByteBuffer implements VariableLengthData {
private static final Logger LOG = Logger.getLogger(EncodedByteBuffer.class.getName());

private BlobByteBuffer bufferView;
private long size;

public EncodedByteBuffer() {
private final File managedFile;
private boolean destroyActionHasBeenTakenBeforeDiag = false;

public EncodedByteBuffer(File managedFile) {
this.managedFile = managedFile;
this.size = 0;
}

public BlobByteBuffer getBufferView() {
return bufferView;
}

public void destroy() throws IOException {
if (bufferView != null) {
bufferView.unmapBlob();
destroyActionHasBeenTakenBeforeDiag = true;
} else {
if (destroyActionHasBeenTakenBeforeDiag) {
LOG.warning("SNAP: destroy() called on EncodedByteBuffer thats already been destroyed previously");
}
}
bufferView = null;
if (managedFile != null) {
// LOG.info("SNAP: EncodedByteBuffer destroy() is also deleting staged file " + managedFile.getAbsolutePath());
Files.delete(managedFile.toPath());
}
}

@Override
public byte get(long index) {
if (index >= this.size) {
throw new IllegalStateException();
if (index >= this.size) { // SNAP: TODO: realized in transformer
if (index >= this.size + Long.BYTES) {
LOG.warning(String.format("SNAP: unexpected get from EncodedByteBuffer: index=%s, size=%s", index, size));
}
}

byte retVal = this.bufferView.getByte(this.bufferView.position() + index);
return retVal;
}

public int getBytes(long index, long len, byte[] bytes) {
return this.bufferView.getBytes(this.bufferView.position() + index, len, bytes, true);
}

/**
* {@inheritDoc}
* This is achieved by initializing a {@code BlobByteBuffer} that is a view on the underlying {@code BlobByteBuffer}
Expand All @@ -59,16 +93,34 @@ public void loadFrom(HollowBlobInput in, long length) throws IOException {

@Override
public void copy(ByteData src, long srcPos, long destPos, long length) {
throw new UnsupportedOperationException("Operation not supported in shared-memory mode");
throw new UnsupportedOperationException("Operation not supported in shared-memory mode - EncodedByteBuffers are read-only");
}

@Override
public void orderedCopy(VariableLengthData src, long srcPos, long destPos, long length) {
throw new UnsupportedOperationException("Operation not supported in shared-memory mode");
public void orderedCopy(VariableLengthData src, long srcPos, long destPos, long length) throws IOException {
throw new UnsupportedOperationException("Underlying data can only be mutated using " + VariableLengthDataFactory.StagedVariableLengthData.class.getName());
}

@Override
public long size() {
return size;
}

@Override
public void set(long index, byte value) {
throw new UnsupportedOperationException("Operation not supported in shared-memory mode");
}

/**
* Write a portion of this data to an OutputStream.
*
* @param os the output stream to write to
* @param startPosition the position to begin copying from this array
* @param len the length of the data to copy
* @throws IOException if the write to the output stream could not be performed
*/
@Override
public void writeTo(OutputStream os, long startPosition, long len) throws IOException {
throw new UnsupportedOperationException("Not supported for shared memory mode, supports the type filter feature");
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package com.netflix.hollow.core.memory;

import static com.netflix.hollow.core.memory.encoding.BlobByteBuffer.MAX_SINGLE_BUFFER_CAPACITY;

import com.netflix.hollow.core.memory.encoding.EncodedLongBuffer;
import com.netflix.hollow.core.memory.encoding.FixedLengthElementArray;
import com.netflix.hollow.core.memory.encoding.VarInt;
import com.netflix.hollow.core.memory.pool.ArraySegmentRecycler;
import com.netflix.hollow.core.read.HollowBlobInput;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.logging.Logger;

public class FixedLengthDataFactory {
Expand All @@ -22,11 +27,46 @@ public static FixedLengthData get(HollowBlobInput in, MemoryMode memoryMode, Arr
}
}

public static void destroy(FixedLengthData fld, ArraySegmentRecycler memoryRecycler) {
// allocate (for write) // unused
public static FixedLengthData allocate(HollowBlobInput in,
MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler,
String fileName) throws IOException {

long numLongs = VarInt.readVLong(in);
long numBits = numLongs << 6;
return allocate(numBits, memoryMode, memoryRecycler, fileName);
}

public static FixedLengthData allocate(long numBits, MemoryMode memoryMode, ArraySegmentRecycler memoryRecycler,
String fileName) throws IOException {
if (memoryMode.equals(MemoryMode.ON_HEAP)) {
return new FixedLengthElementArray(memoryRecycler, numBits);
} else {
long numLongs = ((numBits - 1) >>> 6) + 1;
long numBytes = numLongs << 3;
// add Long.BYTES to provisioned file size to accommodate unaligned read starting offset in last long
File targetFile = provisionTargetFile(numBytes + Long.BYTES, fileName);
try (HollowBlobInput targetBlob = HollowBlobInput.randomAccess(targetFile, MAX_SINGLE_BUFFER_CAPACITY)) {
return EncodedLongBuffer.newFrom(targetBlob, numLongs, targetFile); // TODO: test with different single buffer capacities
}
}
}

static File provisionTargetFile(long numBytes, String fileName) throws IOException {
File targetFile = new File(fileName);
RandomAccessFile raf = new RandomAccessFile(targetFile, "rw");
raf.setLength(numBytes);
raf.close();
// System.out.println("SNAP: Provisioned targetFile (one per shard per type) of size " + numBytes + " bytes: " + targetFile.getPath());
return targetFile;
}

public static void destroy(FixedLengthData fld, ArraySegmentRecycler memoryRecycler) throws IOException {
if (fld instanceof FixedLengthElementArray) {
((FixedLengthElementArray) fld).destroy(memoryRecycler);
} else if (fld instanceof EncodedLongBuffer) {
LOG.warning("Destroy operation is a no-op in shared memory mode");
// LOG.info("SNAP: Destroy operation invoked on EncodedLongBuffer (FixedLengthData)");
((EncodedLongBuffer) fld).destroy();
} else {
throw new UnsupportedOperationException("Unknown type");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.netflix.hollow.core.memory;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Random;

public class MemoryFileUtil {

public static String filepath() {
return System.getProperty("java.io.tmpdir");
}

// whichData is null for object types, and listPointerData, listElementData, mapEntryData etc. for collection types
public static String fixedLengthDataFilename(String type, String whichData, int shardNo) {
return "hollow-fixedLengthData-"
+ (whichData != null ? whichData + "_" : "")
+ type + "_"
+ shardNo + "_"
+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("MM-dd HH:mm")) + "_"
+ new Random().nextInt();
}

public static String varLengthDataFilename(String type, String field, int shardNo) {
return "hollow-varLengthData-"
+ field + "_"
+ type + "_"
+ shardNo + "_"
+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("MM-dd HH:mm")) + "_"
+ new Random().nextInt();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public SegmentedByteArray(ArraySegmentRecycler memoryRecycler) {
* @param index the index
* @param value the byte value
*/
@Override
public void set(long index, byte value) {
int segmentIndex = (int)(index >> log2OfSegmentSize);
ensureCapacity(segmentIndex);
Expand Down Expand Up @@ -236,6 +237,7 @@ public void loadFrom(HollowBlobInput is, long length) throws IOException {
* @param len the length of the data to copy
* @throws IOException if the write to the output stream could not be performed
*/
@Override
public void writeTo(OutputStream os, long startPosition, long len) throws IOException {
int segmentSize = 1 << log2OfSegmentSize;
int remainingBytesInSegment = segmentSize - (int)(startPosition & bitmask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.netflix.hollow.core.read.HollowBlobInput;
import java.io.IOException;
import java.io.OutputStream;

/**
* Conceptually this can be thought of as a single byte array or buffer of undefined length. It will grow automatically
Expand Down Expand Up @@ -38,11 +39,15 @@ public interface VariableLengthData extends ByteData {
* @param destPos position in destination to begin copying to
* @param length length of data to copy in bytes
*/
void orderedCopy(VariableLengthData src, long srcPos, long destPos, long length);
void orderedCopy(VariableLengthData src, long srcPos, long destPos, long length) throws IOException;

/**
* Data size in bytes
* @return size in bytes
*/
long size();

void set(long index, byte value);

void writeTo(OutputStream os, long startPosition, long len) throws IOException;
}
Loading