Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] Report per-type added/removed ordinal count in announcement metadata #550

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.netflix.hollow.core.util.HollowObjectHashCodeFinder;
import com.netflix.hollow.core.util.HollowWriteStateCreator;
import com.netflix.hollow.core.write.HollowBlobWriter;
import com.netflix.hollow.core.write.HollowTypeWriteState;
import com.netflix.hollow.core.write.HollowWriteStateEngine;
import com.netflix.hollow.core.write.objectmapper.HollowObjectMapper;
import com.netflix.hollow.core.write.objectmapper.RecordPrimaryKey;
Expand All @@ -50,8 +51,11 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -839,7 +843,12 @@ private void announce(ProducerListeners listeners, HollowProducer.ReadState read
"Fail the announcement because current producer is not primary (aka leader)");
throw new HollowProducer.NotPrimaryMidCycleException("Announcement failed primary (aka leader) check");
}
announcer.announce(readState.getVersion(), readState.getStateEngine().getHeaderTags());

Map<String, String> announceTags = new HashMap();
announceTags.putAll(changeCounts(getWriteEngine())); // change counts
announceTags.putAll(readState.getStateEngine().getHeaderTags()); // blob headers

announcer.announce(readState.getVersion(), announceTags);
} finally {
singleProducerEnforcer.unlock();
}
Expand All @@ -853,6 +862,35 @@ private void announce(ProducerListeners listeners, HollowProducer.ReadState read
}
}

public Map<String, String> changeCounts(HollowWriteStateEngine writeStateEngine) {
OptionalLong totalAdded = OptionalLong.empty();
OptionalLong totalRemoved = OptionalLong.empty();
Map<String, String> changeCounts = new HashMap();

for (HollowTypeWriteState typeState : writeStateEngine.getOrderedTypeStates()) {
if (typeState.getDeltaAddedOrdinalCount().isPresent()) {
int added = typeState.getDeltaAddedOrdinalCount().getAsInt();
changeCounts.put("hollow.ordinals.added.type." + typeState.getSchema().getName(),
String.valueOf(added));
totalAdded = OptionalLong.of(totalAdded.isPresent()
? (totalAdded.getAsLong() + added)
: added);
}
if (typeState.getDeltaRemovedOrdinalCount().isPresent()) {
int removed = typeState.getDeltaRemovedOrdinalCount().getAsInt();
changeCounts.put("hollow.ordinals.removed.type." + typeState.getSchema().getName(),
String.valueOf(removed));
totalRemoved = OptionalLong.of(totalRemoved.isPresent()
? (totalRemoved.getAsLong() + removed)
: removed);
}
}
totalAdded.ifPresent(v -> changeCounts.put("hollow.ordinals.added.total", String.valueOf(v)));
totalRemoved.ifPresent(v -> changeCounts.put("hollow.ordinals.removed.total", String.valueOf(v)));

return changeCounts;
}

static final class Artifacts {
HollowProducer.Blob snapshot = null;
HollowProducer.Blob delta = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package com.netflix.hollow.core.write;

import com.netflix.hollow.api.consumer.HollowConsumer;
import com.netflix.hollow.api.consumer.HollowConsumer.Blob.BlobType;
import com.netflix.hollow.core.memory.ByteData;
import com.netflix.hollow.core.memory.ByteDataArray;
import com.netflix.hollow.core.memory.ThreadSafeBitSet;
Expand All @@ -25,6 +27,7 @@
import com.netflix.hollow.core.schema.HollowListSchema;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.OptionalInt;

public class HollowListTypeWriteState extends HollowTypeWriteState {

Expand Down Expand Up @@ -232,7 +235,7 @@ private void writeSnapshotShard(DataOutputStream os, int shardNumber) throws IOE

@Override
public void calculateDelta() {
calculateDelta(previousCyclePopulated, currentCyclePopulated);
calculateDelta(previousCyclePopulated, currentCyclePopulated, BlobType.DELTA);
}

@Override
Expand All @@ -242,15 +245,15 @@ public void writeDelta(DataOutputStream dos) throws IOException {

@Override
public void calculateReverseDelta() {
calculateDelta(currentCyclePopulated, previousCyclePopulated);
calculateDelta(currentCyclePopulated, previousCyclePopulated, BlobType.REVERSE_DELTA);
}

@Override
public void writeReverseDelta(DataOutputStream dos) throws IOException {
writeCalculatedDelta(dos);
}

private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated) {
private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated, BlobType blobType) {
maxOrdinal = ordinalMap.maxOrdinal();

numListsInDelta = new int[numShards];
Expand All @@ -260,7 +263,7 @@ private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSe
elementArray = new FixedLengthElementArray[numShards];
deltaAddedOrdinals = new ByteDataArray[numShards];
deltaRemovedOrdinals = new ByteDataArray[numShards];

ThreadSafeBitSet deltaAdditions = toCyclePopulated.andNot(fromCyclePopulated);

int shardMask = numShards - 1;
Expand All @@ -287,6 +290,8 @@ private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSe
long elementCounter[] = new long[numShards];
int previousRemovedOrdinal[] = new int[numShards];
int previousAddedOrdinal[] = new int[numShards];
int addedCount = 0;
int removedCount = 0;

for(int ordinal=0;ordinal<=maxOrdinal;ordinal++) {
int shardNumber = ordinal & shardMask;
Expand All @@ -309,13 +314,20 @@ private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSe

int shardOrdinal = ordinal / numShards;
VarInt.writeVInt(deltaAddedOrdinals[shardNumber], shardOrdinal - previousAddedOrdinal[shardNumber]);
addedCount++;
previousAddedOrdinal[shardNumber] = shardOrdinal;
} else if(fromCyclePopulated.get(ordinal) && !toCyclePopulated.get(ordinal)) {
int shardOrdinal = ordinal / numShards;
VarInt.writeVInt(deltaRemovedOrdinals[shardNumber], shardOrdinal - previousRemovedOrdinal[shardNumber]);
removedCount++;
previousRemovedOrdinal[shardNumber] = shardOrdinal;
}
}

if (blobType.equals(HollowConsumer.Blob.BlobType.DELTA)) {
deltaAddedOrdinalCount = OptionalInt.of(addedCount);
deltaRemovedOrdinalCount = OptionalInt.of(removedCount);
}
}

private void writeCalculatedDelta(DataOutputStream os) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package com.netflix.hollow.core.write;

import com.netflix.hollow.api.consumer.HollowConsumer;
import com.netflix.hollow.api.consumer.HollowConsumer.Blob.BlobType;
import com.netflix.hollow.core.memory.ByteData;
import com.netflix.hollow.core.memory.ByteDataArray;
import com.netflix.hollow.core.memory.ThreadSafeBitSet;
Expand All @@ -26,6 +28,7 @@
import com.netflix.hollow.core.schema.HollowMapSchema;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.OptionalInt;

public class HollowMapTypeWriteState extends HollowTypeWriteState {

Expand Down Expand Up @@ -317,7 +320,7 @@ private void writeSnapshotShard(DataOutputStream os, int shardNumber) throws IOE

@Override
public void calculateDelta() {
calculateDelta(previousCyclePopulated, currentCyclePopulated);
calculateDelta(previousCyclePopulated, currentCyclePopulated, BlobType.DELTA);
}

@Override
Expand All @@ -327,15 +330,15 @@ public void writeDelta(DataOutputStream dos) throws IOException {

@Override
public void calculateReverseDelta() {
calculateDelta(currentCyclePopulated, previousCyclePopulated);
calculateDelta(currentCyclePopulated, previousCyclePopulated, BlobType.REVERSE_DELTA);
}

@Override
public void writeReverseDelta(DataOutputStream dos) throws IOException {
writeCalculatedDelta(dos);
}

private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated) {
private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated, BlobType blobType) {
maxOrdinal = ordinalMap.maxOrdinal();
int bitsPerMapFixedLengthPortion = bitsPerMapSizeValue + bitsPerMapPointer;
int bitsPerMapEntry = bitsPerKeyElement + bitsPerValueElement;
Expand All @@ -346,7 +349,7 @@ private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSe
entryData = new FixedLengthElementArray[numShards];
deltaAddedOrdinals = new ByteDataArray[numShards];
deltaRemovedOrdinals = new ByteDataArray[numShards];

ThreadSafeBitSet deltaAdditions = toCyclePopulated.andNot(fromCyclePopulated);

int shardMask = numShards - 1;
Expand Down Expand Up @@ -380,6 +383,8 @@ private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSe
if(getSchema().getHashKey() != null)
primaryKeyHasher = new HollowWriteStateEnginePrimaryKeyHasher(getSchema().getHashKey(), getStateEngine());

int addedCount = 0;
int removedCount = 0;
for(int ordinal=0;ordinal<=maxOrdinal;ordinal++) {
int shardNumber = ordinal & shardMask;
if(deltaAdditions.get(ordinal)) {
Expand Down Expand Up @@ -430,13 +435,20 @@ private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSe

int shardOrdinal = ordinal / numShards;
VarInt.writeVInt(deltaAddedOrdinals[shardNumber], shardOrdinal - previousAddedOrdinal[shardNumber]);
addedCount++;
previousAddedOrdinal[shardNumber] = shardOrdinal;
} else if(fromCyclePopulated.get(ordinal) && !toCyclePopulated.get(ordinal)) {
int shardOrdinal = ordinal / numShards;
VarInt.writeVInt(deltaRemovedOrdinals[shardNumber], shardOrdinal - previousRemovedOrdinal[shardNumber]);
removedCount++;
previousRemovedOrdinal[shardNumber] = shardOrdinal;
}
}

if (blobType.equals(HollowConsumer.Blob.BlobType.DELTA)) {
deltaAddedOrdinalCount = OptionalInt.of(addedCount);
deltaRemovedOrdinalCount = OptionalInt.of(removedCount);
}
}

private void writeCalculatedDelta(DataOutputStream os) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package com.netflix.hollow.core.write;

import com.netflix.hollow.api.consumer.HollowConsumer;
import com.netflix.hollow.api.consumer.HollowConsumer.Blob.BlobType;
import com.netflix.hollow.core.memory.ByteData;
import com.netflix.hollow.core.memory.ByteDataArray;
import com.netflix.hollow.core.memory.ThreadSafeBitSet;
Expand All @@ -26,6 +28,7 @@
import com.netflix.hollow.core.schema.HollowObjectSchema.FieldType;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.OptionalInt;

public class HollowObjectTypeWriteState extends HollowTypeWriteState {

Expand Down Expand Up @@ -237,7 +240,7 @@ private void writeSnapshotShard(DataOutputStream os, int shardNumber) throws IOE

@Override
public void calculateDelta() {
calculateDelta(previousCyclePopulated, currentCyclePopulated);
calculateDelta(previousCyclePopulated, currentCyclePopulated, BlobType.DELTA);
}

@Override
Expand All @@ -247,15 +250,15 @@ public void writeDelta(DataOutputStream dos) throws IOException {

@Override
public void calculateReverseDelta() {
calculateDelta(currentCyclePopulated, previousCyclePopulated);
calculateDelta(currentCyclePopulated, previousCyclePopulated, BlobType.REVERSE_DELTA);
}

@Override
public void writeReverseDelta(DataOutputStream dos) throws IOException {
writeCalculatedDelta(dos);
}

private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated) {
private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated, BlobType blobType) {
maxOrdinal = ordinalMap.maxOrdinal();
int numBitsPerRecord = fieldStats.getNumBitsPerRecord();

Expand All @@ -267,9 +270,8 @@ private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSe
varLengthByteArrays = new ByteDataArray[numShards][];
recordBitOffset = new long[numShards];
int numAddedRecordsInShard[] = new int[numShards];

int shardMask = numShards - 1;

int addedOrdinal = deltaAdditions.nextSetBit(0);
while(addedOrdinal != -1) {
numAddedRecordsInShard[addedOrdinal & shardMask]++;
Expand All @@ -286,20 +288,29 @@ private void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSe
int previousRemovedOrdinal[] = new int[numShards];
int previousAddedOrdinal[] = new int[numShards];

int addedCount = 0;
int removedCount = 0;
for(int i=0;i<=maxOrdinal;i++) {
int shardNumber = i & shardMask;
if(deltaAdditions.get(i)) {
addRecord(i, recordBitOffset[shardNumber], fixedLengthLongArray[shardNumber], varLengthByteArrays[shardNumber]);
recordBitOffset[shardNumber] += numBitsPerRecord;
int shardOrdinal = i / numShards;
VarInt.writeVInt(deltaAddedOrdinals[shardNumber], shardOrdinal - previousAddedOrdinal[shardNumber]);
addedCount++;
previousAddedOrdinal[shardNumber] = shardOrdinal;
} else if(fromCyclePopulated.get(i) && !toCyclePopulated.get(i)) {
int shardOrdinal = i / numShards;
VarInt.writeVInt(deltaRemovedOrdinals[shardNumber], shardOrdinal - previousRemovedOrdinal[shardNumber]);
removedCount++;
previousRemovedOrdinal[shardNumber] = shardOrdinal;
}
}

if (blobType.equals(HollowConsumer.Blob.BlobType.DELTA)) {
deltaAddedOrdinalCount = OptionalInt.of(addedCount);
deltaRemovedOrdinalCount = OptionalInt.of(removedCount);
}
}

private void writeCalculatedDelta(DataOutputStream os) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package com.netflix.hollow.core.write;

import static com.netflix.hollow.api.consumer.HollowConsumer.Blob.BlobType;

import com.netflix.hollow.api.consumer.HollowConsumer;
import com.netflix.hollow.core.memory.ByteData;
import com.netflix.hollow.core.memory.ByteDataArray;
import com.netflix.hollow.core.memory.ThreadSafeBitSet;
Expand All @@ -26,6 +29,7 @@
import com.netflix.hollow.core.schema.HollowSetSchema;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.OptionalInt;

public class HollowSetTypeWriteState extends HollowTypeWriteState {

Expand Down Expand Up @@ -290,7 +294,7 @@ private void writeSnapshotShard(DataOutputStream os, int shardNumber) throws IOE

@Override
public void calculateDelta() {
calculateDelta(previousCyclePopulated, currentCyclePopulated);
calculateDelta(previousCyclePopulated, currentCyclePopulated, BlobType.DELTA);
}

@Override
Expand All @@ -300,15 +304,15 @@ public void writeDelta(DataOutputStream dos) throws IOException {

@Override
public void calculateReverseDelta() {
calculateDelta(currentCyclePopulated, previousCyclePopulated);
calculateDelta(currentCyclePopulated, previousCyclePopulated, BlobType.REVERSE_DELTA);
}

@Override
public void writeReverseDelta(DataOutputStream dos) throws IOException {
writeCalculatedDelta(dos);
}

public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated) {
public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet toCyclePopulated, BlobType blobType) {
maxOrdinal = ordinalMap.maxOrdinal();
int bitsPerSetFixedLengthPortion = bitsPerSetSizeValue + bitsPerSetPointer;

Expand Down Expand Up @@ -352,6 +356,8 @@ public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet
if(getSchema().getHashKey() != null)
primaryKeyHasher = new HollowWriteStateEnginePrimaryKeyHasher(getSchema().getHashKey(), getStateEngine());

int addedCount = 0;
int removedCount = 0;
for(int ordinal=0;ordinal<=maxOrdinal;ordinal++) {
int shardNumber = ordinal & shardMask;
if(deltaAdditions.get(ordinal)) {
Expand Down Expand Up @@ -397,13 +403,20 @@ public void calculateDelta(ThreadSafeBitSet fromCyclePopulated, ThreadSafeBitSet

int shardOrdinal = ordinal / numShards;
VarInt.writeVInt(deltaAddedOrdinals[shardNumber], shardOrdinal - previousAddedOrdinal[shardNumber]);
addedCount++;
previousAddedOrdinal[shardNumber] = shardOrdinal;
} else if(fromCyclePopulated.get(ordinal) && !toCyclePopulated.get(ordinal)) {
int shardOrdinal = ordinal / numShards;
VarInt.writeVInt(deltaRemovedOrdinals[shardNumber], shardOrdinal - previousRemovedOrdinal[shardNumber]);
removedCount++;
previousRemovedOrdinal[shardNumber] = shardOrdinal;
}
}

if (blobType.equals(HollowConsumer.Blob.BlobType.DELTA)) {
deltaAddedOrdinalCount = OptionalInt.of(addedCount);
deltaRemovedOrdinalCount = OptionalInt.of(removedCount);
}
}

private void writeCalculatedDelta(DataOutputStream os) throws IOException {
Expand Down
Loading